This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new fe44918b94 [parquet] further improve logical type compatibility in
ArrowWriter (#8095)
fe44918b94 is described below
commit fe44918b940e143978c46522aa5f00cd3541f2da
Author: albertlockett <[email protected]>
AuthorDate: Wed Aug 13 13:31:40 2025 -0300
[parquet] further improve logical type compatibility in ArrowWriter (#8095)
# Which issue does this PR close?
- Closes #8012
# Rationale for this change
In https://github.com/apache/arrow-rs/pull/8005 we loosened the
restriction that the Arrow data types for some column need to be exactly
the same between batches, by adding compatibility between dictionary and
native arrays.
At the time, there was a [worthwhile
suggestion](https://github.com/apache/arrow-rs/pull/8005#pullrequestreview-3058034840)
that we extend this compatibility definition to include arrays that
contain the same type of value (e.g. between String, StringView and
LargeString). This PR adds this change.
# What changes are included in this PR?
This PR now has the Parquet ArrowWriter consider the following Arrow
data types compatible:
- String, StringView, LargeString
- Binary, BinaryView, LargeBinary
It also improves the logic around detecting if dictionary values are
compatible. Before, we only had compatibility between a Dictionary and a
Native array, but now we also consider compatible Dictionary types if
they have compatible keys.
# Are these changes tested?
Yes there are unit tests
# Are there any user-facing changes?
No
---
parquet/src/arrow/arrow_writer/levels.rs | 34 +++-
parquet/src/arrow/arrow_writer/mod.rs | 267 ++++++++++++++++++++-----------
2 files changed, 208 insertions(+), 93 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/levels.rs
b/parquet/src/arrow/arrow_writer/levels.rs
index 1956394ac5..3c283bcbe3 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -550,13 +550,41 @@ impl LevelInfoBuilder {
/// and the other is a native array, the dictionary values must have the
same type as the
/// native array
fn types_compatible(a: &DataType, b: &DataType) -> bool {
+ // if the Arrow data types are the same, the types are clearly
compatible
if a == b {
return true;
}
- match (a, b) {
- (DataType::Dictionary(_, v), b) => v.as_ref() == b,
- (a, DataType::Dictionary(_, v)) => a == v.as_ref(),
+ // get the values out of the dictionaries
+ let (a, b) = match (a, b) {
+ (DataType::Dictionary(_, va), DataType::Dictionary(_, vb)) => {
+ (va.as_ref(), vb.as_ref())
+ }
+ (DataType::Dictionary(_, v), b) => (v.as_ref(), b),
+ (a, DataType::Dictionary(_, v)) => (a, v.as_ref()),
+ _ => (a, b),
+ };
+
+ // now that we've got the values from one/both dictionaries, if the
values
+ // have the same Arrow data type, they're compatible
+ if a == b {
+ return true;
+ }
+
+ // here we have different Arrow data types, but if the array contains
the same type of data
+ // then we consider the type compatible
+ match a {
+ // String, StringView and LargeString are compatible
+ DataType::Utf8 => matches!(b, DataType::LargeUtf8 |
DataType::Utf8View),
+ DataType::Utf8View => matches!(b, DataType::LargeUtf8 |
DataType::Utf8),
+ DataType::LargeUtf8 => matches!(b, DataType::Utf8 |
DataType::Utf8View),
+
+ // Binary, BinaryView and LargeBinary are compatible
+ DataType::Binary => matches!(b, DataType::LargeBinary |
DataType::BinaryView),
+ DataType::BinaryView => matches!(b, DataType::LargeBinary |
DataType::Binary),
+ DataType::LargeBinary => matches!(b, DataType::Binary |
DataType::BinaryView),
+
+ // otherwise we have incompatible types
_ => false,
}
}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index d235f5fcab..c6b0b426f9 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -134,16 +134,21 @@ mod levels;
/// a given column, the writer can accept multiple Arrow [`DataType`]s that
contain the same
/// value type.
///
-/// Currently, only compatibility between Arrow dictionary and native arrays
are supported.
-/// Additional type compatibility may be added in future (see [issue
#8012](https://github.com/apache/arrow-rs/issues/8012))
+/// For example, the following [`DataType`]s are all logically equivalent and
can be written
+/// to the same column:
+/// * String, LargeString, StringView
+/// * Binary, LargeBinary, BinaryView
+///
+/// The writer can will also accept both native and dictionary encoded arrays
if the dictionaries
+/// contain compatible values.
/// ```
/// # use std::sync::Arc;
-/// # use arrow_array::{DictionaryArray, RecordBatch, StringArray, UInt8Array};
+/// # use arrow_array::{DictionaryArray, LargeStringArray, RecordBatch,
StringArray, UInt8Array};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use parquet::arrow::arrow_writer::ArrowWriter;
/// let record_batch1 = RecordBatch::try_new(
-/// Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])),
-/// vec![Arc::new(StringArray::from_iter_values(vec!["a", "b"]))]
+/// Arc::new(Schema::new(vec![Field::new("col", DataType::LargeUtf8,
false)])),
+/// vec![Arc::new(LargeStringArray::from_iter_values(vec!["a", "b"]))]
/// )
/// .unwrap();
///
@@ -3092,106 +3097,188 @@ mod tests {
}
#[test]
- fn arrow_writer_dict_and_native_compatibility() {
- let schema = Arc::new(Schema::new(vec![Field::new(
- "a",
- DataType::Dictionary(Box::new(DataType::UInt8),
Box::new(DataType::Utf8)),
- false,
- )]));
+ fn arrow_writer_test_type_compatibility() {
+ fn ensure_compatible_write<T1, T2>(array1: T1, array2: T2,
expected_result: T1)
+ where
+ T1: Array + 'static,
+ T2: Array + 'static,
+ {
+ let schema1 = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ array1.data_type().clone(),
+ false,
+ )]));
+
+ let file = tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(),
schema1.clone(), None).unwrap();
- let rb1 = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(DictionaryArray::new(
- UInt8Array::from_iter_values(vec![0, 1, 0]),
+ let rb1 = RecordBatch::try_new(schema1.clone(),
vec![Arc::new(array1)]).unwrap();
+ writer.write(&rb1).unwrap();
+
+ let schema2 = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ array2.data_type().clone(),
+ false,
+ )]));
+ let rb2 = RecordBatch::try_new(schema2,
vec![Arc::new(array2)]).unwrap();
+ writer.write(&rb2).unwrap();
+
+ writer.close().unwrap();
+
+ let mut record_batch_reader =
+ ParquetRecordBatchReader::try_new(file.try_clone().unwrap(),
1024).unwrap();
+ let actual_batch = record_batch_reader.next().unwrap().unwrap();
+
+ let expected_batch =
+ RecordBatch::try_new(schema1,
vec![Arc::new(expected_result)]).unwrap();
+ assert_eq!(actual_batch, expected_batch);
+ }
+
+ // check compatibility between native and dictionaries
+
+ ensure_compatible_write(
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet"])),
+ ),
+ StringArray::from_iter_values(vec!["barquet"]),
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0, 1]),
Arc::new(StringArray::from_iter_values(vec!["parquet",
"barquet"])),
- ))],
- )
- .unwrap();
+ ),
+ );
- let file = tempfile().unwrap();
- let mut writer =
- ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(),
None).unwrap();
- writer.write(&rb1).unwrap();
-
- // check can append another record batch where the field has the same
type
- // as the dictionary values from the first batch
- let schema2 = Arc::new(Schema::new(vec![Field::new("a",
DataType::Utf8, false)]));
- let rb2 = RecordBatch::try_new(
- schema2,
- vec![Arc::new(StringArray::from_iter_values(vec![
- "barquet", "curious",
- ]))],
- )
- .unwrap();
- writer.write(&rb2).unwrap();
+ ensure_compatible_write(
+ StringArray::from_iter_values(vec!["parquet"]),
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0]),
+ Arc::new(StringArray::from_iter_values(vec!["barquet"])),
+ ),
+ StringArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- writer.close().unwrap();
+ // check compatibility between dictionaries with different key types
- let mut record_batch_reader =
- ParquetRecordBatchReader::try_new(file.try_clone().unwrap(),
1024).unwrap();
- let actual_batch = record_batch_reader.next().unwrap().unwrap();
+ ensure_compatible_write(
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet"])),
+ ),
+ DictionaryArray::new(
+ UInt16Array::from_iter_values(vec![0]),
+ Arc::new(StringArray::from_iter_values(vec!["barquet"])),
+ ),
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0, 1]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet",
"barquet"])),
+ ),
+ );
- let expected_batch = RecordBatch::try_new(
- schema,
- vec![Arc::new(DictionaryArray::new(
- UInt8Array::from_iter_values(vec![0, 1, 0, 1, 2]),
- Arc::new(StringArray::from_iter_values(vec![
- "parquet", "barquet", "curious",
- ])),
- ))],
- )
- .unwrap();
+ // check compatibility between dictionaries with different value types
+ ensure_compatible_write(
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet"])),
+ ),
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0]),
+ Arc::new(LargeStringArray::from_iter_values(vec!["barquet"])),
+ ),
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0, 1]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet",
"barquet"])),
+ ),
+ );
- assert_eq!(actual_batch, expected_batch)
- }
+ // check compatibility between a dictionary and a native array with a
different type
+ ensure_compatible_write(
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet"])),
+ ),
+ LargeStringArray::from_iter_values(vec!["barquet"]),
+ DictionaryArray::new(
+ UInt8Array::from_iter_values(vec![0, 1]),
+ Arc::new(StringArray::from_iter_values(vec!["parquet",
"barquet"])),
+ ),
+ );
- #[test]
- fn arrow_writer_native_and_dict_compatibility() {
- let schema1 = Arc::new(Schema::new(vec![Field::new("a",
DataType::Utf8, false)]));
- let rb1 = RecordBatch::try_new(
- schema1.clone(),
- vec![Arc::new(StringArray::from_iter_values(vec![
- "parquet", "barquet",
- ]))],
- )
- .unwrap();
+ // check compatibility for string types
- let file = tempfile().unwrap();
- let mut writer =
- ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(),
None).unwrap();
- writer.write(&rb1).unwrap();
+ ensure_compatible_write(
+ StringArray::from_iter_values(vec!["parquet"]),
+ LargeStringArray::from_iter_values(vec!["barquet"]),
+ StringArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- let schema2 = Arc::new(Schema::new(vec![Field::new(
- "a",
- DataType::Dictionary(Box::new(DataType::UInt8),
Box::new(DataType::Utf8)),
- false,
- )]));
+ ensure_compatible_write(
+ LargeStringArray::from_iter_values(vec!["parquet"]),
+ StringArray::from_iter_values(vec!["barquet"]),
+ LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- let rb2 = RecordBatch::try_new(
- schema2.clone(),
- vec![Arc::new(DictionaryArray::new(
- UInt8Array::from_iter_values(vec![0, 1, 0]),
- Arc::new(StringArray::from_iter_values(vec!["barquet",
"curious"])),
- ))],
- )
- .unwrap();
- writer.write(&rb2).unwrap();
+ ensure_compatible_write(
+ StringArray::from_iter_values(vec!["parquet"]),
+ StringViewArray::from_iter_values(vec!["barquet"]),
+ StringArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- writer.close().unwrap();
+ ensure_compatible_write(
+ StringViewArray::from_iter_values(vec!["parquet"]),
+ StringArray::from_iter_values(vec!["barquet"]),
+ StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- let mut record_batch_reader =
- ParquetRecordBatchReader::try_new(file.try_clone().unwrap(),
1024).unwrap();
- let actual_batch = record_batch_reader.next().unwrap().unwrap();
+ ensure_compatible_write(
+ LargeStringArray::from_iter_values(vec!["parquet"]),
+ StringViewArray::from_iter_values(vec!["barquet"]),
+ LargeStringArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- let expected_batch = RecordBatch::try_new(
- schema1,
- vec![Arc::new(StringArray::from_iter_values(vec![
- "parquet", "barquet", "barquet", "curious", "barquet",
- ]))],
- )
- .unwrap();
+ ensure_compatible_write(
+ StringViewArray::from_iter_values(vec!["parquet"]),
+ LargeStringArray::from_iter_values(vec!["barquet"]),
+ StringViewArray::from_iter_values(vec!["parquet", "barquet"]),
+ );
- assert_eq!(actual_batch, expected_batch)
+ // check compatibility for binary types
+
+ ensure_compatible_write(
+ BinaryArray::from_iter_values(vec![b"parquet"]),
+ LargeBinaryArray::from_iter_values(vec![b"barquet"]),
+ BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
+ );
+
+ ensure_compatible_write(
+ LargeBinaryArray::from_iter_values(vec![b"parquet"]),
+ BinaryArray::from_iter_values(vec![b"barquet"]),
+ LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
+ );
+
+ ensure_compatible_write(
+ BinaryArray::from_iter_values(vec![b"parquet"]),
+ BinaryViewArray::from_iter_values(vec![b"barquet"]),
+ BinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
+ );
+
+ ensure_compatible_write(
+ BinaryViewArray::from_iter_values(vec![b"parquet"]),
+ BinaryArray::from_iter_values(vec![b"barquet"]),
+ BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
+ );
+
+ ensure_compatible_write(
+ BinaryViewArray::from_iter_values(vec![b"parquet"]),
+ LargeBinaryArray::from_iter_values(vec![b"barquet"]),
+ BinaryViewArray::from_iter_values(vec![b"parquet", b"barquet"]),
+ );
+
+ ensure_compatible_write(
+ LargeBinaryArray::from_iter_values(vec![b"parquet"]),
+ BinaryViewArray::from_iter_values(vec![b"barquet"]),
+ LargeBinaryArray::from_iter_values(vec![b"parquet", b"barquet"]),
+ );
}
#[test]