This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new dff67c9b78 GH-7686: [Parquet] Fix int96 min/max stats (#7687)
dff67c9b78 is described below
commit dff67c9b78bbd6f2311f580ecc20e97e71e013db
Author: Rahul Sharma <[email protected]>
AuthorDate: Wed Jul 23 00:04:06 2025 +0200
GH-7686: [Parquet] Fix int96 min/max stats (#7687)
# Which issue does this PR close?
- Closes #7686
# Rationale for this change
int96 min/max statistics emitted by arrow-rs are incorrect.
# What changes are included in this PR?
1. Fix the int96 stats
2. Add round-trip test to verify the behavior
# Not included in this PR:
1. Read stats only from known good writers. This will be implemented
after a new arrow-rs release.
# Are there any user-facing changes?
The int96 min/max statistics will be different and correct.
---------
Co-authored-by: Rahul Sharma <[email protected]>
Co-authored-by: Ed Seidl <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Alkis Evlogimenos <[email protected]>
---
parquet/src/column/writer/mod.rs | 4 +-
parquet/src/data_type.rs | 38 ++++++++-
parquet/src/file/statistics.rs | 3 -
parquet/tests/int96_stats_roundtrip.rs | 151 +++++++++++++++++++++++++++++++++
4 files changed, 187 insertions(+), 9 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index db7cd31468..9374e226b8 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2528,8 +2528,8 @@ mod tests {
let stats = statistics_roundtrip::<Int96Type>(&input);
assert!(!stats.is_min_max_backwards_compatible());
if let Statistics::Int96(stats) = stats {
- assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20,
30]));
- assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20,
10]));
+ assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20,
10]));
+ assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20,
30]));
} else {
panic!("expecting Statistics::Int96, got {stats:?}");
}
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index 639567f604..6cba02ab3e 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -33,7 +33,7 @@ use crate::util::bit_util::FromBytes;
/// Rust representation for logical type INT96, value is backed by an array of
`u32`.
/// The type only takes 12 bytes, without extra padding.
-#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
+#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct Int96 {
value: [u32; 3],
}
@@ -118,14 +118,44 @@ impl Int96 {
.wrapping_add(nanos)
}
+ #[inline]
+ fn get_days(&self) -> i32 {
+ self.data()[2] as i32
+ }
+
+ #[inline]
+ fn get_nanos(&self) -> i64 {
+ ((self.data()[1] as i64) << 32) + self.data()[0] as i64
+ }
+
#[inline]
fn data_as_days_and_nanos(&self) -> (i32, i64) {
- let day = self.data()[2] as i32;
- let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
- (day, nanos)
+ (self.get_days(), self.get_nanos())
+ }
+}
+
+impl PartialOrd for Int96 {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
}
}
+impl Ord for Int96 {
+ /// Order `Int96` correctly for (deprecated) timestamp types.
+ ///
+ /// Note: this is done even though the Int96 type is deprecated and the
+ /// [spec does not define the sort order]
+ /// because some engines, notably Spark and Databricks Photon still write
+ /// Int96 timestamps and rely on their order for optimization.
+ ///
+ /// [spec does not define the sort order]:
https://github.com/apache/parquet-format/blob/cf943c197f4fad826b14ba0c40eb0ffdab585285/src/main/thrift/parquet.thrift#L1079
+ fn cmp(&self, other: &Self) -> Ordering {
+ match self.get_days().cmp(&other.get_days()) {
+ Ordering::Equal => self.get_nanos().cmp(&other.get_nanos()),
+ ord => ord,
+ }
+ }
+}
impl From<Vec<u32>> for Int96 {
fn from(buf: Vec<u32>) -> Self {
assert_eq!(buf.len(), 3);
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index 0cfcb4d925..d0105461f1 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -209,9 +209,6 @@ pub fn from_thrift(
old_format,
),
Type::INT96 => {
- // INT96 statistics may not be correct, because comparison
is signed
- // byte-wise, not actual timestamps. It is recommended to
ignore
- // min/max statistics for INT96 columns.
let min = if let Some(data) = min {
assert_eq!(data.len(), 12);
Some(Int96::try_from_le_slice(&data)?)
diff --git a/parquet/tests/int96_stats_roundtrip.rs
b/parquet/tests/int96_stats_roundtrip.rs
new file mode 100644
index 0000000000..d6ba8d419e
--- /dev/null
+++ b/parquet/tests/int96_stats_roundtrip.rs
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use chrono::{DateTime, NaiveDateTime, Utc};
+use parquet::basic::Type;
+use parquet::data_type::{Int96, Int96Type};
+use parquet::file::properties::{EnabledStatistics, WriterProperties};
+use parquet::file::reader::{FileReader, SerializedFileReader};
+use parquet::file::statistics::Statistics;
+use parquet::file::writer::SerializedFileWriter;
+use parquet::schema::parser::parse_message_type;
+use rand::seq::SliceRandom;
+use std::fs::File;
+use std::sync::Arc;
+use tempfile::Builder;
+
+fn datetime_to_int96(dt: &str) -> Int96 {
+ let naive = NaiveDateTime::parse_from_str(dt, "%Y-%m-%d
%H:%M:%S%.f").unwrap();
+ let datetime: DateTime<Utc> = DateTime::from_naive_utc_and_offset(naive,
Utc);
+ let nanos = datetime.timestamp_nanos_opt().unwrap();
+ let mut int96 = Int96::new();
+ const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
+ const NANOSECONDS_IN_DAY: i64 = 86_400_000_000_000;
+ let days = nanos / NANOSECONDS_IN_DAY;
+ let remaining_nanos = nanos % NANOSECONDS_IN_DAY;
+ let julian_day = (days + JULIAN_DAY_OF_EPOCH) as i32;
+ let julian_day_u32 = julian_day as u32;
+ let nanos_low = (remaining_nanos & 0xFFFFFFFF) as u32;
+ let nanos_high = ((remaining_nanos >> 32) & 0xFFFFFFFF) as u32;
+ int96.set_data(nanos_low, nanos_high, julian_day_u32);
+ int96
+}
+
+fn verify_ordering(data: Vec<Int96>) {
+ // Create a temporary file
+ let tmp = Builder::new()
+ .prefix("test_int96_stats")
+ .tempfile()
+ .unwrap();
+ let file_path = tmp.path().to_owned();
+
+ // Create schema with INT96 field
+ let message_type = "
+ message test {
+ REQUIRED INT96 timestamp;
+ }
+ ";
+ let schema = parse_message_type(message_type).unwrap();
+
+ // Configure writer properties to enable statistics
+ let props = WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Page)
+ .build();
+
+ let expected_min = data[0];
+ let expected_max = data[data.len() - 1];
+
+ {
+ let file = File::create(&file_path).unwrap();
+ let mut writer = SerializedFileWriter::new(file, schema.into(),
Arc::new(props)).unwrap();
+ let mut row_group = writer.next_row_group().unwrap();
+ let mut col_writer = row_group.next_column().unwrap().unwrap();
+
+ {
+ let writer = col_writer.typed::<Int96Type>();
+ let mut shuffled_data = data.clone();
+ shuffled_data.shuffle(&mut rand::rng());
+ writer.write_batch(&shuffled_data, None, None).unwrap();
+ }
+ col_writer.close().unwrap();
+ row_group.close().unwrap();
+ writer.close().unwrap();
+ }
+
+ let file = File::open(&file_path).unwrap();
+ let reader = SerializedFileReader::new(file).unwrap();
+ let metadata = reader.metadata();
+ let row_group = metadata.row_group(0);
+ let column = row_group.column(0);
+
+ let stats = column.statistics().unwrap();
+ assert_eq!(stats.physical_type(), Type::INT96);
+
+ if let Statistics::Int96(stats) = stats {
+ let min = stats.min_opt().unwrap();
+ let max = stats.max_opt().unwrap();
+
+ assert_eq!(
+ *min, expected_min,
+ "Min value should be {expected_min} but was {min}"
+ );
+ assert_eq!(
+ *max, expected_max,
+ "Max value should be {expected_max} but was {max}"
+ );
+ assert_eq!(stats.null_count_opt(), Some(0));
+ } else {
+ panic!("Expected Int96 statistics");
+ }
+}
+
+#[test]
+fn test_multiple_dates() {
+ let data = vec![
+ datetime_to_int96("2020-01-01 00:00:00.000"),
+ datetime_to_int96("2020-02-29 23:59:59.000"),
+ datetime_to_int96("2020-12-31 23:59:59.000"),
+ datetime_to_int96("2021-01-01 00:00:00.000"),
+ datetime_to_int96("2023-06-15 12:30:45.000"),
+ datetime_to_int96("2024-02-29 15:45:30.000"),
+ datetime_to_int96("2024-12-25 07:00:00.000"),
+ datetime_to_int96("2025-01-01 00:00:00.000"),
+ datetime_to_int96("2025-07-04 20:00:00.000"),
+ datetime_to_int96("2025-12-31 23:59:59.000"),
+ ];
+ verify_ordering(data);
+}
+
+#[test]
+fn test_same_day_different_time() {
+ let data = vec![
+ datetime_to_int96("2020-01-01 00:01:00.000"),
+ datetime_to_int96("2020-01-01 00:02:00.000"),
+ datetime_to_int96("2020-01-01 00:03:00.000"),
+ ];
+ verify_ordering(data);
+}
+
+#[test]
+fn test_increasing_day_decreasing_time() {
+ let data = vec![
+ datetime_to_int96("2020-01-01 12:00:00.000"),
+ datetime_to_int96("2020-02-01 11:00:00.000"),
+ datetime_to_int96("2020-03-01 10:00:00.000"),
+ ];
+ verify_ordering(data);
+}