rshkv commented on code in PR #863: URL: https://github.com/apache/iceberg-rust/pull/863#discussion_r1901273157
########## crates/iceberg/src/arrow/schema.rs: ########## @@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min); get_parquet_stat_as_datum!(max); +/// Utilities to deal with [arrow_array::builder] types in the Iceberg context. +pub(crate) mod builder { + use arrow_array::builder::*; + use arrow_array::cast::AsArray; + use arrow_array::types::*; + use arrow_array::{ArrayRef, Datum as ArrowDatum}; + use arrow_schema::{DataType, TimeUnit}; + use ordered_float::OrderedFloat; + + use crate::spec::{Literal, PrimitiveLiteral}; + use crate::{Error, ErrorKind}; + + /// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at + /// compile-time when types are determined dynamically (e.g. based on some column type). + /// A [DataType] is given at construction time which is used to later downcast the inner array + /// and provided values. + pub(crate) struct AnyArrayBuilder { + data_type: DataType, + inner: Box<dyn ArrayBuilder>, + } + + impl AnyArrayBuilder { + pub(crate) fn new(data_type: &DataType) -> Self { + Self { + data_type: data_type.clone(), + inner: make_builder(data_type, 0), + } + } + + pub(crate) fn finish(&mut self) -> ArrayRef { + self.inner.finish() + } + + /// Append an [[arrow_array::Datum]] value. + pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> crate::Result<()> { + let (array, is_scalar) = value.get(); + assert!(is_scalar, "Can only append scalar datum"); + + match array.data_type() { Review Comment: This is list is exhaustive based on the `ArrowSchemaVisitor::primitive` function above. I.e., every type produced there is covered here. ########## crates/iceberg/src/scan.rs: ########## @@ -1084,6 +1087,50 @@ pub mod tests { .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) + // Note: + // The bounds below need to agree with the test data written below + // into the Parquet file. If not, tests that rely on filter scans + // fail because of wrong bounds. + .lower_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(2)), + (3, Datum::long(3)), + (4, Datum::string("Apache")), + (5, Datum::double(100)), + (6, Datum::int(100)), + (7, Datum::long(100)), + (8, Datum::bool(false)), + (9, Datum::float(100.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .upper_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(5)), + (3, Datum::long(4)), + (4, Datum::string("Iceberg")), + (5, Datum::double(200)), + (6, Datum::int(200)), + (7, Datum::long(200)), + (8, Datum::bool(true)), + (9, Datum::float(200.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum Review Comment: Could add support but thought that might be for another PR. ########## crates/iceberg/src/scan.rs: ########## @@ -1084,6 +1087,50 @@ pub mod tests { .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) + // Note: + // The bounds below need to agree with the test data written below + // into the Parquet file. If not, tests that rely on filter scans + // fail because of wrong bounds. + .lower_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(2)), + (3, Datum::long(3)), + (4, Datum::string("Apache")), + (5, Datum::double(100)), + (6, Datum::int(100)), + (7, Datum::long(100)), + (8, Datum::bool(false)), + (9, Datum::float(100.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .upper_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(5)), + (3, Datum::long(4)), + (4, Datum::string("Iceberg")), + (5, Datum::double(200)), + (6, Datum::int(200)), + (7, Datum::long(200)), + (8, Datum::bool(true)), + (9, Datum::float(200.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .column_sizes(HashMap::from([(1, 1u64), (2, 1u64)])) + .value_counts(HashMap::from([(1, 2u64), (2, 2u64)])) + .null_value_counts(HashMap::from([(1, 3u64), (2, 3u64)])) + .nan_value_counts(HashMap::from([(1, 4u64), (2, 4u64)])) Review Comment: This isn't based on the test data but wanted to have that reflected in tests. ########## crates/iceberg/src/metadata_scan.rs: ########## @@ -128,6 +141,84 @@ impl<'a> SnapshotsTable<'a> { } } +/// Entries table containing the manifest file's entries. +/// +/// The table has one row for each manifest file entry in the current snapshot's manifest list file. +/// For reference, see the Java implementation of [`ManifestEntry`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/core/src/main/java/org/apache/iceberg/ManifestEntry.java +pub struct EntriesTable<'a> { + table: &'a Table, +} + +impl<'a> EntriesTable<'a> { + /// Get the schema for the manifest entries table. + pub fn schema(&self) -> Schema { + Schema::new(vec enum values. In Java, the `status` column is `i32` [(here)](https://github.com/apache/iceberg/blob/4a432839233f2343a9eae8255532f911f06358ef/core/src/main/java/org/apache/iceberg/ManifestEntry.java#L45) but in Python this is [u8](https://github.com/apache/iceberg-python/blob/a051584a3684392d2db6556449eb299145d47d15/pyiceberg/table/inspect.py#L121). My preference would be `u8` but treating the Java implementation as authoritative. ########## crates/iceberg/src/arrow/schema.rs: ########## @@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min); get_parquet_stat_as_datum!(max); +/// Utilities to deal with [arrow_array::builder] types in the Iceberg context. +pub(crate) mod builder { + use arrow_array::builder::*; + use arrow_array::cast::AsArray; + use arrow_array::types::*; + use arrow_array::{ArrayRef, Datum as ArrowDatum}; + use arrow_schema::{DataType, TimeUnit}; + use ordered_float::OrderedFloat; + + use crate::spec::{Literal, PrimitiveLiteral}; + use crate::{Error, ErrorKind}; + + /// A helper wrapping [ArrayBuilder] for building arrays without declaring the inner type at + /// compile-time when types are determined dynamically (e.g. based on some column type). + /// A [DataType] is given at construction time which is used to later downcast the inner array + /// and provided values. + pub(crate) struct AnyArrayBuilder { + data_type: DataType, + inner: Box<dyn ArrayBuilder>, + } + + impl AnyArrayBuilder { + pub(crate) fn new(data_type: &DataType) -> Self { + Self { + data_type: data_type.clone(), + inner: make_builder(data_type, 0), + } + } + + pub(crate) fn finish(&mut self) -> ArrayRef { + self.inner.finish() + } + + /// Append an [[arrow_array::Datum]] value. + pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> crate::Result<()> { + let (array, is_scalar) = value.get(); + assert!(is_scalar, "Can only append scalar datum"); + + match array.data_type() { + DataType::Boolean => self + .builder::<BooleanBuilder>()? + .append_value(array.as_boolean().value(0)), + DataType::Int32 => self + .builder::<Int32Builder>()? + .append_value(array.as_primitive::<Int32Type>().value(0)), + DataType::Int64 => self + .builder::<Int64Builder>()? + .append_value(array.as_primitive::<Int64Type>().value(0)), + DataType::Float32 => self + .builder::<Float32Builder>()? + .append_value(array.as_primitive::<Float32Type>().value(0)), + DataType::Float64 => self + .builder::<Float64Builder>()? + .append_value(array.as_primitive::<Float64Type>().value(0)), + DataType::Decimal128(_, _) => self + .builder::<Decimal128Builder>()? + .append_value(array.as_primitive::<Decimal128Type>().value(0)), + DataType::Date32 => self + .builder::<Date32Builder>()? + .append_value(array.as_primitive::<Date32Type>().value(0)), + DataType::Time64(TimeUnit::Microsecond) => self + .builder::<Time64MicrosecondBuilder>()? + .append_value(array.as_primitive::<Time64MicrosecondType>().value(0)), + DataType::Timestamp(TimeUnit::Microsecond, _) => self + .builder::<TimestampMicrosecondBuilder>()? + .append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)), + DataType::Timestamp(TimeUnit::Nanosecond, _) => self + .builder::<TimestampNanosecondBuilder>()? + .append_value(array.as_primitive::<TimestampNanosecondType>().value(0)), Review Comment: I understand it's correct to ignore the timezone here because that's not captured in the builder. ########## crates/iceberg/src/table.rs: ########## @@ -203,7 +203,7 @@ impl Table { /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. - pub fn metadata_table(self) -> MetadataTable { + pub fn metadata_table(&self) -> MetadataTable<'_> { Review Comment: Addressing this comment https://github.com/apache/iceberg-rust/pull/822#discussion_r1899617821. I prefer that but don't _need_ to do here. ########## crates/iceberg/src/metadata_scan.rs: ########## @@ -255,8 +345,515 @@ impl<'a> ManifestsTable<'a> { } } +/// Builds the struct describing data files listed in a table manifest. +/// +/// For reference, see the Java implementation of [`DataFile`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java +struct DataFileStructBuilder<'a> { + // Reference to table metadata to retrieve partition specs based on partition spec ids + table_metadata: &'a TableMetadata, + // Below are the field builders of the "data_file" struct + content: Int8Builder, + file_path: StringBuilder, + file_format: StringBuilder, + partition: PartitionValuesStructBuilder, + // The count types in the Java and PyIceberg implementation i64 however the values coming from + // the deserialized data files are u64. We agree with the latter to avoid casting. + record_count: Int64Builder, + file_size_in_bytes: Int64Builder, + column_sizes: MapBuilder<Int32Builder, Int64Builder>, + value_counts: MapBuilder<Int32Builder, Int64Builder>, + null_value_counts: MapBuilder<Int32Builder, Int64Builder>, + nan_value_counts: MapBuilder<Int32Builder, Int64Builder>, + lower_bounds: MapBuilder<Int32Builder, BinaryBuilder>, + upper_bounds: MapBuilder<Int32Builder, BinaryBuilder>, + key_metadata: BinaryBuilder, + split_offsets: ListBuilder<Int64Builder>, + equality_ids: ListBuilder<Int32Builder>, + sort_order_ids: Int32Builder, +} + +impl<'a> DataFileStructBuilder<'a> { + fn new(table_metadata: &'a TableMetadata) -> Self { + Self { + table_metadata, + content: Int8Builder::new(), + file_path: StringBuilder::new(), + file_format: StringBuilder::new(), + partition: PartitionValuesStructBuilder::new(table_metadata), + record_count: Int64Builder::new(), + file_size_in_bytes: Int64Builder::new(), + column_sizes: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + value_counts: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + null_value_counts: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + nan_value_counts: MapBuilder::new(None, Int32Builder::new(), Int64Builder::new()), + lower_bounds: MapBuilder::new(None, Int32Builder::new(), BinaryBuilder::new()), + upper_bounds: MapBuilder::new(None, Int32Builder::new(), BinaryBuilder::new()), + key_metadata: BinaryBuilder::new(), + split_offsets: ListBuilder::new(Int64Builder::new()), + equality_ids: ListBuilder::new(Int32Builder::new()), + sort_order_ids: Int32Builder::new(), + } + } + + fn fields(table_metadata: &TableMetadata) -> Fields { + vec![ + Field::new("content", DataType::Int8, false), + Field::new("file_path", DataType::Utf8, false), + Field::new("file_format", DataType::Utf8, false), + Field::new( + "partition", + DataType::Struct(PartitionValuesStructBuilder::combined_partition_fields( + table_metadata, + )), + false, + ), + Field::new("record_count", DataType::Int64, false), + Field::new("file_size_in_bytes", DataType::Int64, false), + Field::new( + "column_sizes", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "value_counts", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "null_value_counts", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "nan_value_counts", + Self::column_id_to_value_type(DataType::Int64), + true, + ), + Field::new( + "lower_bounds", + Self::column_id_to_value_type(DataType::Binary), + true, + ), + Field::new( + "upper_bounds", + Self::column_id_to_value_type(DataType::Binary), + true, + ), + Field::new("key_metadata", DataType::Binary, true), + Field::new( + "split_offsets", + DataType::new_list(DataType::Int64, true), + true, + ), + Field::new( + "equality_ids", + DataType::new_list(DataType::Int32, true), + true, + ), + Field::new("sort_order_id", DataType::Int32, true), + ] + .into() + } + + /// Construct a new struct type that maps from column ids (i32) to the provided value type. + /// Keys, values, and the whole struct are non-nullable. + fn column_id_to_value_type(value_type: DataType) -> DataType { + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Int32, false), + Field::new("values", value_type, true), + ] + .into(), + ), + false, + )), + false, + ) + } + + fn append(&mut self, manifest_file: &ManifestFile, data_file: &DataFile) -> Result<()> { + self.content.append_value(data_file.content as i8); + self.file_path.append_value(data_file.file_path()); + self.file_format + .append_value(data_file.file_format().to_string().to_uppercase()); + self.partition.append( + self.partition_spec(manifest_file)?.clone().fields(), + data_file.partition(), + )?; + self.record_count + .append_value(data_file.record_count() as i64); + self.file_size_in_bytes + .append_value(data_file.file_size_in_bytes() as i64); Review Comment: The casting is slightly annoying given we're dealing with non-negative types. But the Python and Java implementation use i64. ########## crates/iceberg/src/scan.rs: ########## @@ -1084,6 +1087,50 @@ pub mod tests { .record_count(1) .partition(Struct::from_iter([Some(Literal::long(100))])) .key_metadata(None) + // Note: + // The bounds below need to agree with the test data written below + // into the Parquet file. If not, tests that rely on filter scans + // fail because of wrong bounds. + .lower_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(2)), + (3, Datum::long(3)), + (4, Datum::string("Apache")), + (5, Datum::double(100)), + (6, Datum::int(100)), + (7, Datum::long(100)), + (8, Datum::bool(false)), + (9, Datum::float(100.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) + .upper_bounds(HashMap::from([ + (1, Datum::long(1)), + (2, Datum::long(5)), + (3, Datum::long(4)), + (4, Datum::string("Iceberg")), + (5, Datum::double(200)), + (6, Datum::int(200)), + (7, Datum::long(200)), + (8, Datum::bool(true)), + (9, Datum::float(200.0)), + // decimal values are not supported by schema::get_arrow_datum + // (10, Datum::decimal(Decimal(123, 2))), + (11, Datum::date(0)), + (12, Datum::timestamp_micros(0)), + (13, Datum::timestamptz_micros(0)), + // ns timestamps, uuid, fixed, binary are currently not + // supported in schema::get_arrow_datum + ])) Review Comment: Adding these so we cover those as types in the lower and upper bounds. I'm trying to limit the changes I'm making because it's already a lot. My preference would be to cover all types as partition columns as well. ########## crates/iceberg/src/metadata_scan.rs: ########## @@ -255,8 +345,515 @@ impl<'a> ManifestsTable<'a> { } } +/// Builds the struct describing data files listed in a table manifest. +/// +/// For reference, see the Java implementation of [`DataFile`][1]. +/// +/// [1]: https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java +struct DataFileStructBuilder<'a> { + // Reference to table metadata to retrieve partition specs based on partition spec ids + table_metadata: &'a TableMetadata, + // Below are the field builders of the "data_file" struct + content: Int8Builder, + file_path: StringBuilder, + file_format: StringBuilder, + partition: PartitionValuesStructBuilder, + // The count types in the Java and PyIceberg implementation i64 however the values coming from + // the deserialized data files are u64. We agree with the latter to avoid casting. + record_count: Int64Builder, + file_size_in_bytes: Int64Builder, + column_sizes: MapBuilder<Int32Builder, Int64Builder>, + value_counts: MapBuilder<Int32Builder, Int64Builder>, + null_value_counts: MapBuilder<Int32Builder, Int64Builder>, + nan_value_counts: MapBuilder<Int32Builder, Int64Builder>, + lower_bounds: MapBuilder<Int32Builder, BinaryBuilder>, + upper_bounds: MapBuilder<Int32Builder, BinaryBuilder>, + key_metadata: BinaryBuilder, + split_offsets: ListBuilder<Int64Builder>, + equality_ids: ListBuilder<Int32Builder>, + sort_order_ids: Int32Builder, +} + +impl<'a> DataFileStructBuilder<'a> { + fn new(table_metadata: &'a TableMetadata) -> Self { + Self { + table_metadata, + content: Int8Builder::new(), + file_path: StringBuilder::new(), + file_format: StringBuilder::new(), + partition: PartitionValuesStructBuilder::new(table_metadata), + record_count: Int64Builder::new(), Review Comment: The `manifests` table merged in #861 prefers using `PrimitiveBuilder::new()` which works because `Int64Builder` is just `PrimitiveBuilder<Int64Type>`. I prefer saying `Int64Builder` here to be explicit about the type but happy to change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org