kevinjqliu commented on code in PR #1851:
URL: https://github.com/apache/iceberg-rust/pull/1851#discussion_r2532399241
##########
crates/iceberg/src/spec/table_properties.rs:
##########
@@ -137,6 +162,21 @@ impl TableProperties {
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str =
"write.target-file-size-bytes";
/// Default target file size
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 *
1024 * 1024; // 512 MB
+
+ /// Compression codec for metadata files (JSON)
+ pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str =
"write.metadata.compression-codec";
+ /// Default metadata compression codec - uncompressed
+ pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str =
"uncompressed";
Review Comment:
```suggestion
/// Default metadata compression codec - uncompressed
pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none";
```
this should be "none" according to
https://iceberg.apache.org/docs/nightly/configuration/#write-properties
<img width="769" height="68" alt="Image"
src="https://github.com/user-attachments/assets/75c8885b-4653-43ce-978b-6573646f6e69"
/>
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -410,6 +444,7 @@ impl<'a> SnapshotProducer<'a> {
self.table.metadata().current_snapshot_id(),
next_seq_num,
Some(first_row_id),
+ compression,
Review Comment:
```suggestion
compression.clone(),
```
should this be .clone? or should the rest be without .clone?
##########
crates/iceberg/src/spec/avro_util.rs:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for working with Apache Avro in Iceberg.
+
+use apache_avro::Codec;
+use log::warn;
+
+/// Settings for compression codec and level.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CompressionSettings {
+ /// The compression codec name (e.g., "gzip", "zstd", "deflate",
"uncompressed")
+ pub codec: String,
+ /// The compression level (None uses codec-specific defaults: gzip=9,
zstd=1)
+ pub level: Option<u8>,
+}
+
+impl CompressionSettings {
+ /// Create a new CompressionSettings with the specified codec and level.
+ pub fn new(codec: String, level: Option<u8>) -> Self {
+ Self { codec, level }
+ }
+
+ /// Convert to apache_avro::Codec using the codec_from_str helper function.
+ pub(crate) fn to_codec(&self) -> Codec {
+ codec_from_str(Some(&self.codec), self.level)
+ }
+}
+
+impl Default for CompressionSettings {
+ fn default() -> Self {
+ use crate::spec::TableProperties;
+ Self {
+ codec:
TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(),
+ level: None,
+ }
+ }
+}
+
+/// Convert codec name and level to apache_avro::Codec.
+/// Returns Codec::Null for unknown or unsupported codecs.
+///
+/// # Arguments
+///
+/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd",
"deflate", "uncompressed")
+/// * `level` - The compression level (None uses codec defaults: gzip=9,
zstd=1). For deflate/gzip:
+/// - 0: NoCompression
+/// - 1: BestSpeed
+/// - 9: BestCompression
+/// - 10: UberCompression
+/// - Other values: DefaultLevel (6)
+///
+/// # Supported Codecs
+///
+/// - `gzip` or `deflate`: Uses Deflate compression with specified level
(default: 9)
+/// - `zstd`: Uses Zstandard compression (default: 1, level clamped to valid
zstd range 0-22)
+/// - `uncompressed` or `None`: No compression
+/// - Any other value: Defaults to no compression (Codec::Null)
+///
+/// # Compression Levels
+///
+/// The compression level mapping is based on miniz_oxide's CompressionLevel
enum:
+/// - Level 0: No compression
+/// - Level 1: Best speed (fastest)
+/// - Level 9: Best compression (slower, better compression)
+/// - Level 10: Uber compression (slowest, best compression)
+/// - Other: Default level (balanced speed/compression)
+pub(crate) fn codec_from_str(codec: Option<&str>, level: Option<u8>) -> Codec {
+ use apache_avro::{DeflateSettings, ZstandardSettings};
+
+ match codec {
Review Comment:
nit: java uses case insensitive comparison, should we do the same here?
https://github.com/apache/iceberg/blob/700575f6e58c655ba680a0e5cc25d5a88c225d3c/core/src/main/java/org/apache/iceberg/avro/Avro.java#L252
##########
crates/iceberg/src/spec/manifest/writer.rs:
##########
@@ -410,7 +423,11 @@ impl ManifestWriter {
// Manifest schema did not change between V2 and V3
FormatVersion::V2 | FormatVersion::V3 =>
manifest_schema_v2(&partition_type)?,
};
- let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
+
+ // Determine compression codec using CompressionSettings
Review Comment:
```suggestion
```
##########
crates/iceberg/src/spec/table_metadata.rs:
##########
@@ -461,9 +461,57 @@ impl TableMetadata {
file_io: &FileIO,
metadata_location: impl AsRef<str>,
) -> Result<()> {
+ use std::io::Write as _;
+
+ use flate2::write::GzEncoder;
+
+ let json_data = serde_json::to_vec(self)?;
+
+ // Check if compression is enabled via table properties
+ let codec = self
+ .properties
+
.get(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
+ .map(|s| s.as_str())
+
.unwrap_or(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
+
+ let (data_to_write, actual_location) = match codec {
+ "gzip" => {
+ let mut encoder = GzEncoder::new(Vec::new(),
flate2::Compression::default());
+ encoder.write_all(&json_data).map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Failed to compress metadata with gzip",
+ )
+ .with_source(e)
+ })?;
+ let compressed_data = encoder.finish().map_err(|e| {
+ Error::new(ErrorKind::DataInvalid, "Failed to finish gzip
compression")
+ .with_source(e)
+ })?;
+
+ // Modify filename to add .gz before .metadata.json
+ let location = metadata_location.as_ref();
+ let new_location = if location.ends_with(".metadata.json") {
+ location.replace(".metadata.json", ".gz.metadata.json")
+ } else {
+ // If it doesn't end with expected pattern, just append .gz
+ format!("{}.gz", location)
+ };
+
+ (compressed_data, new_location)
+ }
+ "uncompressed" | "" => (json_data,
metadata_location.as_ref().to_string()),
Review Comment:
```suggestion
"none" | "" => (json_data,
metadata_location.as_ref().to_string()),
```
##########
crates/iceberg/src/spec/avro_util.rs:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for working with Apache Avro in Iceberg.
+
+use apache_avro::Codec;
+use log::warn;
+
+/// Settings for compression codec and level.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CompressionSettings {
+ /// The compression codec name (e.g., "gzip", "zstd", "deflate",
"uncompressed")
+ pub codec: String,
+ /// The compression level (None uses codec-specific defaults: gzip=9,
zstd=1)
+ pub level: Option<u8>,
+}
+
+impl CompressionSettings {
+ /// Create a new CompressionSettings with the specified codec and level.
+ pub fn new(codec: String, level: Option<u8>) -> Self {
+ Self { codec, level }
+ }
+
+ /// Convert to apache_avro::Codec using the codec_from_str helper function.
+ pub(crate) fn to_codec(&self) -> Codec {
+ codec_from_str(Some(&self.codec), self.level)
+ }
+}
+
+impl Default for CompressionSettings {
+ fn default() -> Self {
+ use crate::spec::TableProperties;
+ Self {
+ codec:
TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(),
+ level: None,
+ }
+ }
+}
+
+/// Convert codec name and level to apache_avro::Codec.
+/// Returns Codec::Null for unknown or unsupported codecs.
+///
+/// # Arguments
+///
+/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd",
"deflate", "uncompressed")
Review Comment:
is it easy to add snappy to completely match all the supported codex in
java?
##########
crates/iceberg/src/spec/avro_util.rs:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for working with Apache Avro in Iceberg.
+
+use apache_avro::Codec;
+use log::warn;
+
+/// Settings for compression codec and level.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CompressionSettings {
+ /// The compression codec name (e.g., "gzip", "zstd", "deflate",
"uncompressed")
+ pub codec: String,
+ /// The compression level (None uses codec-specific defaults: gzip=9,
zstd=1)
+ pub level: Option<u8>,
+}
+
+impl CompressionSettings {
+ /// Create a new CompressionSettings with the specified codec and level.
+ pub fn new(codec: String, level: Option<u8>) -> Self {
+ Self { codec, level }
+ }
+
+ /// Convert to apache_avro::Codec using the codec_from_str helper function.
+ pub(crate) fn to_codec(&self) -> Codec {
+ codec_from_str(Some(&self.codec), self.level)
+ }
+}
+
+impl Default for CompressionSettings {
+ fn default() -> Self {
+ use crate::spec::TableProperties;
+ Self {
+ codec:
TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(),
+ level: None,
+ }
+ }
+}
+
+/// Convert codec name and level to apache_avro::Codec.
+/// Returns Codec::Null for unknown or unsupported codecs.
+///
+/// # Arguments
+///
+/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd",
"deflate", "uncompressed")
Review Comment:
"deflate" is not a valid codec option in java, but gzip uses
CodecFactory.deflateCodec
https://github.com/apache/iceberg/blob/700575f6e58c655ba680a0e5cc25d5a88c225d3c/core/src/main/java/org/apache/iceberg/avro/Avro.java#L264-L267
i think we can just remove deflate
##########
crates/iceberg/src/spec/table_metadata.rs:
##########
@@ -461,9 +461,57 @@ impl TableMetadata {
file_io: &FileIO,
metadata_location: impl AsRef<str>,
) -> Result<()> {
+ use std::io::Write as _;
+
+ use flate2::write::GzEncoder;
+
+ let json_data = serde_json::to_vec(self)?;
+
+ // Check if compression is enabled via table properties
+ let codec = self
+ .properties
+
.get(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC)
+ .map(|s| s.as_str())
+
.unwrap_or(crate::spec::table_properties::TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT);
+
+ let (data_to_write, actual_location) = match codec {
+ "gzip" => {
Review Comment:
nit: java uses case insensitive comparison, should we do the same here?
https://github.com/apache/iceberg/blob/700575f6e58c655ba680a0e5cc25d5a88c225d3c/core/src/main/java/org/apache/iceberg/TableMetadataParser.java#L63
##########
crates/iceberg/src/spec/avro_util.rs:
##########
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for working with Apache Avro in Iceberg.
+
+use apache_avro::Codec;
+use log::warn;
+
+/// Settings for compression codec and level.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CompressionSettings {
+ /// The compression codec name (e.g., "gzip", "zstd", "deflate",
"uncompressed")
+ pub codec: String,
+ /// The compression level (None uses codec-specific defaults: gzip=9,
zstd=1)
+ pub level: Option<u8>,
+}
+
+impl CompressionSettings {
+ /// Create a new CompressionSettings with the specified codec and level.
+ pub fn new(codec: String, level: Option<u8>) -> Self {
+ Self { codec, level }
+ }
+
+ /// Convert to apache_avro::Codec using the codec_from_str helper function.
+ pub(crate) fn to_codec(&self) -> Codec {
+ codec_from_str(Some(&self.codec), self.level)
+ }
+}
+
+impl Default for CompressionSettings {
+ fn default() -> Self {
+ use crate::spec::TableProperties;
+ Self {
+ codec:
TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT.to_string(),
+ level: None,
+ }
+ }
+}
+
+/// Convert codec name and level to apache_avro::Codec.
+/// Returns Codec::Null for unknown or unsupported codecs.
+///
+/// # Arguments
+///
+/// * `codec` - The name of the compression codec (e.g., "gzip", "zstd",
"deflate", "uncompressed")
+/// * `level` - The compression level (None uses codec defaults: gzip=9,
zstd=1). For deflate/gzip:
+/// - 0: NoCompression
+/// - 1: BestSpeed
+/// - 9: BestCompression
+/// - 10: UberCompression
+/// - Other values: DefaultLevel (6)
+///
+/// # Supported Codecs
+///
+/// - `gzip` or `deflate`: Uses Deflate compression with specified level
(default: 9)
+/// - `zstd`: Uses Zstandard compression (default: 1, level clamped to valid
zstd range 0-22)
+/// - `uncompressed` or `None`: No compression
+/// - Any other value: Defaults to no compression (Codec::Null)
Review Comment:
java throws on unknown codex,
https://github.com/apache/iceberg/blob/700575f6e58c655ba680a0e5cc25d5a88c225d3c/core/src/main/java/org/apache/iceberg/avro/Avro.java#L269-L270
but i think its fine if we want to return Codec::Null
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]