liurenjie1024 commented on code in PR #765:
URL: https://github.com/apache/iceberg-rust/pull/765#discussion_r1900504741


##########
crates/iceberg/src/puffin/metadata.rs:
##########
@@ -0,0 +1,797 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use once_cell::sync::Lazy;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along 
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {
+    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
+    pub(crate) r#type: String,

Review Comment:
   Should this be an enum?



##########
crates/iceberg/src/puffin/metadata.rs:
##########
@@ -0,0 +1,797 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use once_cell::sync::Lazy;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along 
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {

Review Comment:
   Should we make them pub? I think user want to access them. Also should we 
provide a builder?



##########
crates/iceberg/src/puffin/metadata.rs:
##########
@@ -0,0 +1,797 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use once_cell::sync::Lazy;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along 
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {
+    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
+    pub(crate) r#type: String,
+    /// List of field IDs the blob was computed for; the order of items is 
used to compute sketches stored in the blob.
+    #[serde(rename = "fields")]
+    pub(crate) input_fields: Vec<i32>,

Review Comment:
   ```suggestion
       pub(crate) fields: Vec<i32>,
   ```
   A little confusing why not use `fields` directly?



##########
crates/iceberg/src/puffin/metadata.rs:
##########
@@ -0,0 +1,797 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use once_cell::sync::Lazy;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along 
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {
+    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
+    pub(crate) r#type: String,
+    /// List of field IDs the blob was computed for; the order of items is 
used to compute sketches stored in the blob.
+    #[serde(rename = "fields")]
+    pub(crate) input_fields: Vec<i32>,
+    /// ID of the Iceberg table's snapshot the blob was computed from
+    pub(crate) snapshot_id: i64,
+    /// Sequence number of the Iceberg table's snapshot the blob was computed 
from
+    pub(crate) sequence_number: i64,
+    /// The offset in the file where the blob contents start
+    pub(crate) offset: u64,
+    /// The length of the blob stored in the file (after compression, if 
compressed)
+    pub(crate) length: usize,
+    /// The compression codec used to compress the data
+    #[serde(skip_serializing_if = "CompressionCodec::is_none")]
+    #[serde(default)]
+    pub(crate) compression_codec: CompressionCodec,
+    /// Arbitrary meta-information about the blob
+    #[serde(skip_serializing_if = "HashMap::is_empty")]
+    #[serde(default)]
+    pub(crate) properties: HashMap<String, String>,
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub(crate) enum Flag {
+    FooterPayloadCompressed,
+}
+
+#[derive(PartialEq, Eq, Hash)]
+pub(crate) struct ByteNumber(pub u8);
+
+#[derive(PartialEq, Eq, Hash)]
+pub(crate) struct BitNumber(pub u8);
+
+static FLAGS_BY_BYTE_AND_BIT: Lazy<HashMap<(ByteNumber, BitNumber), Flag>> = 
Lazy::new(|| {
+    let mut m = HashMap::new();
+    m.insert(
+        (
+            Flag::FooterPayloadCompressed.byte_number(),
+            Flag::FooterPayloadCompressed.bit_number(),
+        ),
+        Flag::FooterPayloadCompressed,
+    );
+    m
+});
+
+impl Flag {
+    pub(crate) fn byte_number(&self) -> ByteNumber {
+        match self {
+            Flag::FooterPayloadCompressed => ByteNumber(0),
+        }
+    }
+
+    pub(crate) fn bit_number(&self) -> BitNumber {
+        match self {
+            Flag::FooterPayloadCompressed => BitNumber(0),
+        }
+    }
+
+    fn from(byte_and_bit: &(ByteNumber, BitNumber)) -> Option<Flag> {
+        FLAGS_BY_BYTE_AND_BIT.get(byte_and_bit).cloned()
+    }
+}
+
+/// Metadata about a puffin file.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#filemetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+pub(crate) struct FileMetadata {
+    /// Metadata about blobs in file
+    pub(crate) blobs: Vec<BlobMetadata>,
+    /// Arbitrary meta-information, like writer identification/version.
+    #[serde(skip_serializing_if = "HashMap::is_empty")]
+    #[serde(default)]
+    pub(crate) properties: HashMap<String, String>,
+}
+
+impl FileMetadata {
+    pub(crate) const MAGIC_LENGTH: u8 = 4;
+    pub(crate) const MAGIC: [u8; FileMetadata::MAGIC_LENGTH as usize] = [0x50, 
0x46, 0x41, 0x31];
+
+    // We use the term FOOTER_STRUCT to refer to the fixed-length portion of 
the Footer, as illustrated below.
+    //
+    //                        Footer
+    //                          |
+    //  -------------------------------------------------
+    // |                                                 |
+    // Magic FooterPayload FooterPayloadLength Flags Magic
+    //                     |                             |
+    //                      -----------------------------
+    //                                    |
+    //                              FOOTER_STRUCT
+
+    const FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET: u8 = 0;
+    const FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH: u8 = 4;
+    const FOOTER_STRUCT_FLAGS_OFFSET: u8 = 
FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_OFFSET
+        + FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH;
+    pub(crate) const FOOTER_STRUCT_FLAGS_LENGTH: u8 = 4;
+    const FOOTER_STRUCT_MAGIC_OFFSET: u8 =
+        FileMetadata::FOOTER_STRUCT_FLAGS_OFFSET + 
FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH;
+    pub(crate) const FOOTER_STRUCT_LENGTH: u8 =
+        FileMetadata::FOOTER_STRUCT_MAGIC_OFFSET + FileMetadata::MAGIC_LENGTH;
+
+    fn check_magic(bytes: &[u8]) -> Result<()> {
+        if bytes == FileMetadata::MAGIC {
+            Ok(())
+        } else {
+            Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Bad magic value: {:?} should be {:?}",
+                    bytes,
+                    FileMetadata::MAGIC
+                ),
+            ))
+        }
+    }
+
+    async fn read_footer_payload_length(
+        file_read: &dyn FileRead,
+        input_file_length: u64,
+    ) -> Result<u32> {
+        let start = input_file_length - 
u64::from(FileMetadata::FOOTER_STRUCT_LENGTH);
+        let end = start + 
u64::from(FileMetadata::FOOTER_STRUCT_PAYLOAD_LENGTH_LENGTH);
+        let footer_payload_length_bytes = file_read.read(start..end).await?;
+        let mut buf = [0; 4];
+        buf.copy_from_slice(&footer_payload_length_bytes);
+        let footer_payload_length = u32::from_le_bytes(buf);
+        Ok(footer_payload_length)
+    }
+
+    async fn read_footer_bytes(
+        file_read: &dyn FileRead,
+        input_file_length: u64,
+        footer_payload_length: u32,
+    ) -> Result<Bytes> {
+        let footer_length = u64::from(footer_payload_length)
+            + u64::from(FileMetadata::FOOTER_STRUCT_LENGTH)
+            + u64::from(FileMetadata::MAGIC_LENGTH);
+        let start = input_file_length - footer_length;
+        let end = input_file_length;
+        file_read.read(start..end).await
+    }
+
+    fn decode_flags(footer_bytes: &[u8]) -> Result<HashSet<Flag>> {
+        let mut flags = HashSet::new();
+        for byte_number in 0..FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH {
+            let byte_offset = footer_bytes.len()
+                - usize::from(FileMetadata::MAGIC_LENGTH)
+                - usize::from(FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH)
+                + usize::from(byte_number);
+
+            let mut flag_byte = *footer_bytes.get(byte_offset).ok_or_else(|| {
+                Error::new(ErrorKind::DataInvalid, "Index range is out of 
bounds.")
+            })?;
+            let mut bit_number = 0;
+            while flag_byte != 0 {
+                if flag_byte & 0x1 != 0 {
+                    match Flag::from(&(ByteNumber(byte_number), 
BitNumber(bit_number))) {
+                        Some(flag) => flags.insert(flag),
+                        None => {
+                            return Err(Error::new(
+                                ErrorKind::DataInvalid,
+                                format!(
+                                    "Unknown flag byte {} and bit {} 
combination",
+                                    byte_number, bit_number
+                                ),
+                            ))
+                        }
+                    };
+                }
+                flag_byte >>= 1;
+                bit_number += 1;
+            }
+        }
+        Ok(flags)
+    }
+
+    fn extract_footer_payload_as_str(
+        footer_bytes: &[u8],
+        footer_payload_length: u32,
+    ) -> Result<String> {
+        let flags = FileMetadata::decode_flags(footer_bytes)?;
+        let footer_compression_codec = if 
flags.contains(&Flag::FooterPayloadCompressed) {
+            CompressionCodec::Lz4
+        } else {
+            CompressionCodec::None
+        };
+
+        let start_offset = usize::from(FileMetadata::MAGIC_LENGTH);
+        let end_offset =
+            usize::from(FileMetadata::MAGIC_LENGTH) + 
usize::try_from(footer_payload_length)?;
+        let footer_payload_bytes = footer_bytes
+            .get(start_offset..end_offset)
+            .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Index range is 
out of bounds."))?;
+        let decompressed_footer_payload_bytes =
+            footer_compression_codec.decompress(footer_payload_bytes.into())?;
+
+        String::from_utf8(decompressed_footer_payload_bytes).map_err(|src| {
+            Error::new(ErrorKind::DataInvalid, "Footer is not a valid UTF-8 
string")
+                .with_source(src)
+        })
+    }
+
+    fn from_json_str(string: &str) -> Result<FileMetadata> {
+        serde_json::from_str::<FileMetadata>(string).map_err(|src| {
+            Error::new(ErrorKind::DataInvalid, "Given string is not valid 
JSON").with_source(src)
+        })
+    }
+
+    /// Returns the file metadata about a Puffin file
+    pub(crate) async fn read(input_file: &InputFile) -> Result<FileMetadata> {

Review Comment:
   I have some concerns about this api. Should we consider having a 
`PuffinReader` like java?



##########
crates/iceberg/src/puffin/metadata.rs:
##########
@@ -0,0 +1,797 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use bytes::Bytes;
+use once_cell::sync::Lazy;
+use serde::{Deserialize, Serialize};
+
+use crate::io::{FileRead, InputFile};
+use crate::puffin::compression::CompressionCodec;
+use crate::{Error, ErrorKind, Result};
+
+/// Human-readable identification of the application writing the file, along 
with its version.
+/// Example: "Trino version 381"
+pub(crate) const CREATED_BY_PROPERTY: &str = "created-by";
+
+/// Metadata about a blob.
+/// For more information, see: 
https://iceberg.apache.org/puffin-spec/#blobmetadata
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub(crate) struct BlobMetadata {
+    /// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
+    pub(crate) r#type: String,
+    /// List of field IDs the blob was computed for; the order of items is 
used to compute sketches stored in the blob.
+    #[serde(rename = "fields")]
+    pub(crate) input_fields: Vec<i32>,
+    /// ID of the Iceberg table's snapshot the blob was computed from
+    pub(crate) snapshot_id: i64,
+    /// Sequence number of the Iceberg table's snapshot the blob was computed 
from
+    pub(crate) sequence_number: i64,
+    /// The offset in the file where the blob contents start
+    pub(crate) offset: u64,
+    /// The length of the blob stored in the file (after compression, if 
compressed)
+    pub(crate) length: usize,
+    /// The compression codec used to compress the data
+    #[serde(skip_serializing_if = "CompressionCodec::is_none")]
+    #[serde(default)]
+    pub(crate) compression_codec: CompressionCodec,
+    /// Arbitrary meta-information about the blob
+    #[serde(skip_serializing_if = "HashMap::is_empty")]
+    #[serde(default)]
+    pub(crate) properties: HashMap<String, String>,
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub(crate) enum Flag {
+    FooterPayloadCompressed,
+}
+
+#[derive(PartialEq, Eq, Hash)]
+pub(crate) struct ByteNumber(pub u8);
+
+#[derive(PartialEq, Eq, Hash)]
+pub(crate) struct BitNumber(pub u8);

Review Comment:
   I feel that this part is a little over design, the spec says flags is always 
4 byte, so why not just like this:
   ```rust
   pub enum Flag {
      FooterPayloadCompressed = 0
   }
   
   impl Flag {
      pub(crate) fn byte_idx(self) -> u8 {
         (self as u8) / 8
      }
   
   pub(crate) fn bit_idx(self) -> u8 {
         (self as u8) % 8
      }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to