This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 71d27b6afc Test int96 Parquet file from Spark (#7367)
71d27b6afc is described below
commit 71d27b6afc26a8a3273164c58f105640e4e26d45
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Apr 4 07:29:33 2025 -0400
Test int96 Parquet file from Spark (#7367)
* Stash.
* Test int96_from_spark.parquet.
* Update parquet-testing to include int96_from_spark.parquet.
* Address feedback.
---
parquet-testing | 2 +-
parquet/src/arrow/arrow_reader/mod.rs | 111 ++++++++++++++++++++++++++++++++--
2 files changed, 106 insertions(+), 7 deletions(-)
diff --git a/parquet-testing b/parquet-testing
index f4d7ed772a..6e851ddd76 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882
+Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 66780fcd60..5b4cf49c7f 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -959,12 +959,6 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;
- use bytes::Bytes;
- use half::f16;
- use num::PrimInt;
- use rand::{rng, Rng, RngCore};
- use tempfile::tempfile;
-
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
@@ -978,6 +972,11 @@ mod tests {
ArrowError, DataType as ArrowDataType, Field, Fields, Schema,
SchemaRef, TimeUnit,
};
use arrow_select::concat::concat_batches;
+ use bytes::Bytes;
+ use half::f16;
+ use num::PrimInt;
+ use rand::{rng, Rng, RngCore};
+ use tempfile::tempfile;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader,
@@ -1563,6 +1562,106 @@ mod tests {
})
}
+ #[test]
+ fn test_int96_from_spark_file_with_provided_schema() {
+ // int96_from_spark.parquet was written based on Spark's microsecond
timestamps which trade
+ // range for resolution compared to a nanosecond timestamp. We must
provide a schema with
+ // microsecond resolution for the Parquet reader to interpret these
values correctly.
+ use arrow_schema::DataType::Timestamp;
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path = format!("{test_data}/int96_from_spark.parquet");
+ let file = File::open(path).unwrap();
+
+ let supplied_schema = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ Timestamp(TimeUnit::Microsecond, None),
+ true,
+ )]));
+ let options =
ArrowReaderOptions::new().with_schema(supplied_schema.clone());
+
+ let mut record_reader =
+ ParquetRecordBatchReaderBuilder::try_new_with_options(file,
options)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let batch = record_reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_columns(), 1);
+ let column = batch.column(0);
+ assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond,
None));
+
+ let expected = Arc::new(Int64Array::from(vec![
+ Some(1704141296123456),
+ Some(1704070800000000),
+ Some(253402225200000000),
+ Some(1735599600000000),
+ None,
+ Some(9089380393200000000),
+ ]));
+
+ // arrow-rs relies on the chrono library to convert between timestamps
and strings, so
+ // instead compare as Int64. The underlying type should be a
PrimitiveArray of Int64
+ // anyway, so this should be a zero-copy non-modifying cast.
+
+ let binding = arrow_cast::cast(batch.column(0),
&arrow_schema::DataType::Int64).unwrap();
+ let casted_timestamps = binding.as_primitive::<types::Int64Type>();
+
+ assert_eq!(casted_timestamps.len(), expected.len());
+
+ casted_timestamps
+ .iter()
+ .zip(expected.iter())
+ .for_each(|(lhs, rhs)| {
+ assert_eq!(lhs, rhs);
+ });
+ }
+
+ #[test]
+ fn test_int96_from_spark_file_without_provided_schema() {
+ // int96_from_spark.parquet was written based on Spark's microsecond
timestamps which trade
+ // range for resolution compared to a nanosecond timestamp. Without a
provided schema, some
+ // values when read as nanosecond resolution overflow and result in
garbage values.
+ use arrow_schema::DataType::Timestamp;
+ let test_data = arrow::util::test_util::parquet_test_data();
+ let path = format!("{test_data}/int96_from_spark.parquet");
+ let file = File::open(path).unwrap();
+
+ let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let batch = record_reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_columns(), 1);
+ let column = batch.column(0);
+ assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
+
+ let expected = Arc::new(Int64Array::from(vec![
+ Some(1704141296123456000), // Reads as nanosecond fine (note 3
extra 0s)
+ Some(1704070800000000000), // Reads as nanosecond fine (note 3
extra 0s)
+ Some(-4852191831933722624), // Cannot be represented with nanos
timestamp (year 9999)
+ Some(1735599600000000000), // Reads as nanosecond fine (note 3
extra 0s)
+ None,
+ Some(-4864435138808946688), // Cannot be represented with nanos
timestamp (year 290000)
+ ]));
+
+ // arrow-rs relies on the chrono library to convert between timestamps
and strings, so
+ // instead compare as Int64. The underlying type should be a
PrimitiveArray of Int64
+ // anyway, so this should be a zero-copy non-modifying cast.
+
+ let binding = arrow_cast::cast(batch.column(0),
&arrow_schema::DataType::Int64).unwrap();
+ let casted_timestamps = binding.as_primitive::<types::Int64Type>();
+
+ assert_eq!(casted_timestamps.len(), expected.len());
+
+ casted_timestamps
+ .iter()
+ .zip(expected.iter())
+ .for_each(|(lhs, rhs)| {
+ assert_eq!(lhs, rhs);
+ });
+ }
+
struct RandUtf8Gen {}
impl RandGen<ByteArrayType> for RandUtf8Gen {