Xuanwo commented on code in PR #765: URL: https://github.com/apache/iceberg-rust/pull/765#discussion_r1885019802
########## crates/iceberg/src/puffin/compression.rs: ########## @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. +use serde::{Deserialize, Serialize}; + use crate::{Error, ErrorKind, Result}; -#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] /// Data compression formats Review Comment: I usually prefer to keep comments at the top before `#[derive]` to make them easier to find. This also provides more space for writing lengthy comments, which may include examples. ```rust /// Data compression formats #[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] ``` ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result<Bytes> { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds<T>() -> Result<T> { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) Review Comment: The same, we can use `FileMetadata::MAGIC_LENGTH as usize` ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; Review Comment: Please note that we usually need to avoid reading small bytes from cloud storage. Therefore, we will need something like prefetching, which will read a large amount of data capable of holding the entire footer. We can implement this optimization in a separate PR. ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { Review Comment: Let's do early return: ``` if bytes == FileMetadata::MAGIC { return Ok(()) } ... ``` ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result<Bytes> { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds<T>() -> Result<T> { Review Comment: If this error doesn't occur often, how about writing to them directly? I believe we won't need to change them frequently. I tend to optimize for reading code. ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result<Bytes> { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds<T>() -> Result<T> { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) + - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + + usize::from(byte_number); + + let mut flag_byte = match footer_bytes.get(byte_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(byte) => Ok(*byte), + }?; + let mut bit_number = 0; + while flag_byte != 0 { + if flag_byte & 0x1 != 0 { + match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { + Some(flag) => flags.insert(flag), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_number, bit_number + ), + )) + } + }; + } + flag_byte >>= 1; + bit_number += 1; + } + } + Ok(flags) + } + + fn extract_footer_payload_as_str( + footer_bytes: &[u8], + footer_payload_length: u32, + ) -> Result<String> { + let flags = FileMetadata::decode_flags(footer_bytes)?; + let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) { + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let end_offset = + usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { Review Comment: Let's use `ok_or_else` to simplify the code ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { Review Comment: Will we have other flags? ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result<Bytes> { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds<T>() -> Result<T> { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) + - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + + usize::from(byte_number); + + let mut flag_byte = match footer_bytes.get(byte_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(byte) => Ok(*byte), + }?; + let mut bit_number = 0; + while flag_byte != 0 { + if flag_byte & 0x1 != 0 { + match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { + Some(flag) => flags.insert(flag), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_number, bit_number + ), + )) + } + }; + } + flag_byte >>= 1; + bit_number += 1; + } + } + Ok(flags) + } + + fn extract_footer_payload_as_str( + footer_bytes: &[u8], + footer_payload_length: u32, + ) -> Result<String> { + let flags = FileMetadata::decode_flags(footer_bytes)?; + let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) { + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let end_offset = + usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(data) => Ok(data), + }?; + let decompressed_footer_payload_bytes = + footer_compression_codec.decompress(footer_payload_bytes.into())?; + + match String::from_utf8(decompressed_footer_payload_bytes) { Review Comment: Let's use `map_err` to simplify the code ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); Review Comment: We can use `FileMetadata::FOOTER_STRUCT_LENGTH as u64` for simple ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result<Bytes> { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds<T>() -> Result<T> { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) + - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + + usize::from(byte_number); + + let mut flag_byte = match footer_bytes.get(byte_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(byte) => Ok(*byte), + }?; + let mut bit_number = 0; + while flag_byte != 0 { + if flag_byte & 0x1 != 0 { + match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { + Some(flag) => flags.insert(flag), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_number, bit_number + ), + )) + } + }; + } + flag_byte >>= 1; + bit_number += 1; + } + } + Ok(flags) + } + + fn extract_footer_payload_as_str( + footer_bytes: &[u8], + footer_payload_length: u32, + ) -> Result<String> { + let flags = FileMetadata::decode_flags(footer_bytes)?; + let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) { + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let end_offset = + usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(data) => Ok(data), + }?; + let decompressed_footer_payload_bytes = + footer_compression_codec.decompress(footer_payload_bytes.into())?; + + match String::from_utf8(decompressed_footer_payload_bytes) { + Err(src) => Err(Error::new( + ErrorKind::DataInvalid, + "Footer is not a valid UTF-8 string", + ) + .with_source(src)), + Ok(str) => Ok(str), + } + } + + fn from_json_str(string: &str) -> Result<FileMetadata> { + match serde_json::from_str::<FileMetadata>(string) { + Ok(file_metadata) => Ok(file_metadata), + Err(src) => Err( + Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON") + .with_source(src), + ), + } + } + + #[rustfmt::skip] Review Comment: Hi, it's better to let `rustfmt` to do it's work. ########## crates/iceberg/src/puffin/metadata.rs: ########## @@ -0,0 +1,809 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; + +use bytes::Bytes; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +use crate::io::{FileRead, InputFile}; +use crate::puffin::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Human-readable identification of the application writing the file, along with its version. +/// Example: "Trino version 381" +pub(crate) const CREATED_BY_PROPERTY: &str = "created-by"; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metadata about a blob. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#blobmetadata +pub(crate) struct BlobMetadata { + /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types + pub(crate) r#type: String, + /// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob. + #[serde(rename = "fields")] + pub(crate) input_fields: Vec<i32>, + /// ID of the Iceberg table's snapshot the blob was computed from + pub(crate) snapshot_id: i64, + /// Sequence number of the Iceberg table's snapshot the blob was computed from + pub(crate) sequence_number: i64, + /// The offset in the file where the blob contents start + pub(crate) offset: u64, + /// The length of the blob stored in the file (after compression, if compressed) + pub(crate) length: usize, + /// The compression codec used to compress the data + #[serde(skip_serializing_if = "CompressionCodec::is_none")] + #[serde(default)] + pub(crate) compression_codec: CompressionCodec, + /// Arbitrary meta-information about the blob + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + pub(crate) properties: HashMap<String, String>, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(crate) enum Flag { + FooterPayloadCompressed, +} + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct ByteNumber(pub u8); + +#[derive(PartialEq, Eq, Hash)] +pub(crate) struct BitNumber(pub u8); + +static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert( + ( + Flag::FooterPayloadCompressed.byte_number(), + Flag::FooterPayloadCompressed.bit_number(), + ), + Flag::FooterPayloadCompressed, + ); + m +}); + +impl Flag { + pub(crate) fn byte_number(&self) -> ByteNumber { + match self { + Flag::FooterPayloadCompressed => ByteNumber(0), + } + } + + pub(crate) fn bit_number(&self) -> BitNumber { + match self { + Flag::FooterPayloadCompressed => BitNumber(0), + } + } + + fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> { + FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned() + } +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +/// Metadata about a puffin file. +/// For more information, see: https://iceberg.apache.org/puffin-spec/#filemetadata +pub(crate) struct FileMetadata { + /// Metadata about blobs in file + pub(crate) blobs: Vec<BlobMetadata>, + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + /// Arbitrary meta-information, like writer identification/version. + pub(crate) properties: HashMap<String, String>, +} + +impl FileMetadata { + pub(crate) const MAGIC_LENGTH: u8 = 4; + pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 0x46, 0x41, 0x31]; + + // We use the term FOOTER_STRUCT to refer to the fixed-length portion of the Footer, as illustrated below. + // + // Footer + // | + // ------------------------------------------------- + // | | + // Magic FooterPayload FooterPayloadLength Flags Magic + // | | + // ----------------------------- + // | + // FOOTER_STRUCT + + const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0; + const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4; + const FOOTER_STRUCT_FLAGS_OFFSET: u8 = FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET + + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH; + pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4; + const FOOTER_STRUCT_MAGIC_OFFSET: u8 = + FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH; + pub(crate) const FOOTER_STRUCT_LENGTH: u8 = + FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH; + + fn check_magic(bytes: &[u8]) -> Result<()> { + if bytes != FileMetadata::MAGIC { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Bad magic value: {:?} should be {:?}", + bytes, + FileMetadata::MAGIC + ), + )) + } else { + Ok(()) + } + } + + async fn read_footer_payload_length( + file_read: &dyn FileRead, + input_file_length: u64, + ) -> Result<u32> { + let start = input_file_length - u64::from(FileMetadata::FOOTER_STRUCT_LENGTH); + let end = start + u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH); + let footer_payload_length_bytes = file_read.read(start..end).await?; + let mut buf = [0; 4]; + buf.copy_from_slice(&footer_payload_length_bytes); + let footer_payload_length = u32::from_le_bytes(buf); + Ok(footer_payload_length) + } + + async fn read_footer_bytes( + file_read: &dyn FileRead, + input_file_length: u64, + footer_payload_length: u32, + ) -> Result<Bytes> { + let footer_length = u64::from(footer_payload_length) + + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH) + + u64::from(FileMetadata::MAGIC_LENGTH); + let start = input_file_length - footer_length; + let end = input_file_length; + file_read.read(start..end).await + } + + fn err_out_of_bounds<T>() -> Result<T> { + Err(Error::new( + ErrorKind::DataInvalid, + "Index range is out of bounds.", + )) + } + + fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> { + let mut flags = HashSet::new(); + for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH { + let byte_offset = footer_bytes.len() + - usize::from(FileMetadata::MAGIC_LENGTH) + - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH) + + usize::from(byte_number); + + let mut flag_byte = match footer_bytes.get(byte_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(byte) => Ok(*byte), + }?; + let mut bit_number = 0; + while flag_byte != 0 { + if flag_byte & 0x1 != 0 { + match Flag::from(&(ByteNumber(byte_number), BitNumber(bit_number))) { + Some(flag) => flags.insert(flag), + None => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Unknown flag byte {} and bit {} combination", + byte_number, bit_number + ), + )) + } + }; + } + flag_byte >>= 1; + bit_number += 1; + } + } + Ok(flags) + } + + fn extract_footer_payload_as_str( + footer_bytes: &[u8], + footer_payload_length: u32, + ) -> Result<String> { + let flags = FileMetadata::decode_flags(footer_bytes)?; + let footer_compression_codec = if flags.contains(&Flag::FooterPayloadCompressed) { + CompressionCodec::Lz4 + } else { + CompressionCodec::None + }; + + let start_offset = usize::from(FileMetadata::MAGIC_LENGTH); + let end_offset = + usize::from(FileMetadata::MAGIC_LENGTH) + usize::try_from(footer_payload_length)?; + let footer_payload_bytes = match footer_bytes.get(start_offset..end_offset) { + None => FileMetadata::err_out_of_bounds(), + Some(data) => Ok(data), + }?; + let decompressed_footer_payload_bytes = + footer_compression_codec.decompress(footer_payload_bytes.into())?; + + match String::from_utf8(decompressed_footer_payload_bytes) { + Err(src) => Err(Error::new( + ErrorKind::DataInvalid, + "Footer is not a valid UTF-8 string", + ) + .with_source(src)), + Ok(str) => Ok(str), + } + } + + fn from_json_str(string: &str) -> Result<FileMetadata> { + match serde_json::from_str::<FileMetadata>(string) { + Ok(file_metadata) => Ok(file_metadata), + Err(src) => Err( + Error::new(ErrorKind::DataInvalid, "Given string is not valid JSON") + .with_source(src), + ), + } + } + + #[rustfmt::skip] + /// Returns the file metadata about a Puffin file + pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> { + let file_read = input_file.reader().await?; + + let first_four_bytes = file_read.read(0..FileMetadata::MAGIC_LENGTH.into()).await?; + FileMetadata::check_magic(&first_four_bytes)?; + + let input_file_length = input_file.metadata().await?.size; + let footer_payload_length = FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; + let footer_bytes = FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length).await?; + + let magic_length = usize::from(FileMetadata::MAGIC_LENGTH); + FileMetadata::check_magic(&footer_bytes[..magic_length])?; // first four bytes of footer Review Comment: We can put the comment before this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org