This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 291e6e575c Add arrow-avro support for Impala Nullability (#7954)
291e6e575c is described below
commit 291e6e575c727a98ee52b617da0c8de64a821e09
Author: Veronica Manchola <[email protected]>
AuthorDate: Mon Jul 21 11:20:14 2025 -0400
Add arrow-avro support for Impala Nullability (#7954)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/4886
- Related to https://github.com/apache/arrow-rs/pull/6965
# Rationale for this change
This change introduces support for Avro files generated by systems like
Impala, which have a specific convention for representing nullable
fields. In Avro, nullability is typically represented by a union of a
type and a type. This PR updates the Avro reader to correctly interpret
these schemas, ensuring proper handling of nullable data and improving
interoperability with Impala-generated data. `null`
# What changes are included in this PR?
This pull request introduces several changes to support Impala-style
nullability in the Avro reader:
- The Avro schema parser has been updated to recognize unions where is
the second type (e.g., `['type', 'null']`) as a nullable field. `null`
- Logic has been added to handle this nullability convention during Avro
decoding.
- New tests are included to verify that Avro files using this
nullability format are read correctly while ensuring that strict mode
properly identifies them.
# Are these changes tested?
Yes, I added new test cases covering these changes to the tests named:
`test_nonnullable_impala`, `test_nonnullable_impala_strict`,
`test_nullable_impala` and `test_nullable_impala_strict`.
# Are there any user-facing changes?
N/A
---------
Co-authored-by: Connor Sanders <[email protected]>
---
arrow-avro/src/codec.rs | 126 +++++++++++--
arrow-avro/src/reader/mod.rs | 391 +++++++++++++++++++++++++++++++++++++++-
arrow-avro/src/reader/record.rs | 36 ++--
3 files changed, 508 insertions(+), 45 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 88b30a6d49..bd265503d7 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -148,7 +148,7 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
match schema {
Schema::Complex(ComplexType::Record(r)) => {
let mut resolver = Resolver::default();
- let data_type = make_data_type(schema, None, &mut resolver,
false)?;
+ let data_type = make_data_type(schema, None, &mut resolver,
false, false)?;
Ok(AvroField {
data_type,
name: r.name.to_string(),
@@ -161,6 +161,60 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
}
}
+/// Builder for an [`AvroField`]
+#[derive(Debug)]
+pub struct AvroFieldBuilder<'a> {
+ schema: &'a Schema<'a>,
+ use_utf8view: bool,
+ strict_mode: bool,
+}
+
+impl<'a> AvroFieldBuilder<'a> {
+ /// Creates a new [`AvroFieldBuilder`]
+ pub fn new(schema: &'a Schema<'a>) -> Self {
+ Self {
+ schema,
+ use_utf8view: false,
+ strict_mode: false,
+ }
+ }
+
+ /// Enable or disable Utf8View support
+ pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
+ self.use_utf8view = use_utf8view;
+ self
+ }
+
+ /// Enable or disable strict mode.
+ pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+ self.strict_mode = strict_mode;
+ self
+ }
+
+ /// Build an [`AvroField`] from the builder
+ pub fn build(self) -> Result<AvroField, ArrowError> {
+ match self.schema {
+ Schema::Complex(ComplexType::Record(r)) => {
+ let mut resolver = Resolver::default();
+ let data_type = make_data_type(
+ self.schema,
+ None,
+ &mut resolver,
+ self.use_utf8view,
+ self.strict_mode,
+ )?;
+ Ok(AvroField {
+ name: r.name.to_string(),
+ data_type,
+ })
+ }
+ _ => Err(ArrowError::ParseError(format!(
+ "Expected a Record schema to build an AvroField, but got {:?}",
+ self.schema
+ ))),
+ }
+ }
+}
/// An Avro encoding
///
/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
@@ -409,6 +463,7 @@ fn make_data_type<'a>(
namespace: Option<&'a str>,
resolver: &mut Resolver<'a>,
use_utf8view: bool,
+ strict_mode: bool,
) -> Result<AvroDataType, ArrowError> {
match schema {
Schema::TypeName(TypeName::Primitive(p)) => {
@@ -428,12 +483,20 @@ fn make_data_type<'a>(
.position(|x| x ==
&Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
match (f.len() == 2, null) {
(true, Some(0)) => {
- let mut field = make_data_type(&f[1], namespace, resolver,
use_utf8view)?;
+ let mut field =
+ make_data_type(&f[1], namespace, resolver,
use_utf8view, strict_mode)?;
field.nullability = Some(Nullability::NullFirst);
Ok(field)
}
(true, Some(1)) => {
- let mut field = make_data_type(&f[0], namespace, resolver,
use_utf8view)?;
+ if strict_mode {
+ return Err(ArrowError::SchemaError(
+ "Found Avro union of the form ['T','null'], which
is disallowed in strict_mode"
+ .to_string(),
+ ));
+ }
+ let mut field =
+ make_data_type(&f[0], namespace, resolver,
use_utf8view, strict_mode)?;
field.nullability = Some(Nullability::NullSecond);
Ok(field)
}
@@ -456,6 +519,7 @@ fn make_data_type<'a>(
namespace,
resolver,
use_utf8view,
+ strict_mode,
)?,
})
})
@@ -469,8 +533,13 @@ fn make_data_type<'a>(
Ok(field)
}
ComplexType::Array(a) => {
- let mut field =
- make_data_type(a.items.as_ref(), namespace, resolver,
use_utf8view)?;
+ let mut field = make_data_type(
+ a.items.as_ref(),
+ namespace,
+ resolver,
+ use_utf8view,
+ strict_mode,
+ )?;
Ok(AvroDataType {
nullability: None,
metadata: a.attributes.field_metadata(),
@@ -535,7 +604,8 @@ fn make_data_type<'a>(
Ok(field)
}
ComplexType::Map(m) => {
- let val = make_data_type(&m.values, namespace, resolver,
use_utf8view)?;
+ let val =
+ make_data_type(&m.values, namespace, resolver,
use_utf8view, strict_mode)?;
Ok(AvroDataType {
nullability: None,
metadata: m.attributes.field_metadata(),
@@ -549,6 +619,7 @@ fn make_data_type<'a>(
namespace,
resolver,
use_utf8view,
+ strict_mode,
)?;
// https://avro.apache.org/docs/1.11.1/specification/#logical-types
@@ -630,7 +701,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Int,
"date");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::Date32));
}
@@ -640,7 +711,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Int,
"time-millis");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::TimeMillis));
}
@@ -650,7 +721,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Long,
"time-micros");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::TimeMicros));
}
@@ -660,7 +731,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Long,
"timestamp-millis");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::TimestampMillis(true)));
}
@@ -670,7 +741,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Long,
"timestamp-micros");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::TimestampMicros(true)));
}
@@ -680,7 +751,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Long,
"local-timestamp-millis");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::TimestampMillis(false)));
}
@@ -690,7 +761,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Long,
"local-timestamp-micros");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::TimestampMicros(false)));
}
@@ -745,7 +816,7 @@ mod tests {
let schema = create_schema_with_logical_type(PrimitiveType::Int,
"custom-type");
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert_eq!(
result.metadata.get("logicalType"),
@@ -758,7 +829,7 @@ mod tests {
let schema =
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
true).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, true,
false).unwrap();
assert!(matches!(result.codec, Codec::Utf8View));
}
@@ -768,7 +839,7 @@ mod tests {
let schema =
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
false).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, false,
false).unwrap();
assert!(matches!(result.codec, Codec::Utf8));
}
@@ -796,7 +867,7 @@ mod tests {
let schema = Schema::Complex(ComplexType::Record(record));
let mut resolver = Resolver::default();
- let result = make_data_type(&schema, None, &mut resolver,
true).unwrap();
+ let result = make_data_type(&schema, None, &mut resolver, true,
false).unwrap();
if let Codec::Struct(fields) = &result.codec {
let first_field_codec = &fields[0].data_type().codec;
@@ -805,4 +876,25 @@ mod tests {
panic!("Expected Struct codec");
}
}
+
+ #[test]
+ fn test_union_with_strict_mode() {
+ let schema = Schema::Union(vec![
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+ ]);
+
+ let mut resolver = Resolver::default();
+ let result = make_data_type(&schema, None, &mut resolver, false, true);
+
+ assert!(result.is_err());
+ match result {
+ Err(ArrowError::SchemaError(msg)) => {
+ assert!(msg.contains(
+ "Found Avro union of the form ['T','null'], which is
disallowed in strict_mode"
+ ));
+ }
+ _ => panic!("Expected SchemaError"),
+ }
+ }
}
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 5059e41ff0..3bc7d94b7c 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -86,7 +86,7 @@
//! ```
//!
-use crate::codec::AvroField;
+use crate::codec::AvroFieldBuilder;
use crate::schema::Schema as AvroSchema;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
@@ -221,12 +221,11 @@ impl ReaderBuilder {
}
fn make_record_decoder(&self, schema: &AvroSchema<'_>) ->
Result<RecordDecoder, ArrowError> {
- let root_field = AvroField::try_from(schema)?;
- RecordDecoder::try_new_with_options(
- root_field.data_type(),
- self.utf8_view,
- self.strict_mode,
- )
+ let root_field = AvroFieldBuilder::new(schema)
+ .with_utf8view(self.utf8_view)
+ .with_strict_mode(self.strict_mode)
+ .build()?;
+ RecordDecoder::try_new_with_options(root_field.data_type(),
self.utf8_view)
}
fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header,
Decoder), ArrowError> {
@@ -395,8 +394,12 @@ mod test {
use crate::compression::CompressionCodec;
use crate::reader::record::RecordDecoder;
use crate::reader::vlq::VLQDecoder;
- use crate::reader::{read_header, Decoder, ReaderBuilder};
+ use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
use crate::test_util::arrow_test_data;
+ use arrow_array::builder::{
+ Float64Builder, Int32Builder, ListBuilder, MapBuilder, StringBuilder,
StructBuilder,
+ };
+
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
use arrow_array::*;
use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema};
@@ -422,6 +425,19 @@ mod test {
arrow::compute::concat_batches(&schema, &batches).unwrap()
}
+ fn read_file_strict(
+ path: &str,
+ batch_size: usize,
+ utf8_view: bool,
+ ) -> Result<Reader<BufReader<File>>, ArrowError> {
+ let file = File::open(path).unwrap();
+ ReaderBuilder::new()
+ .with_batch_size(batch_size)
+ .with_utf8_view(utf8_view)
+ .with_strict_mode(true)
+ .build(BufReader::new(file))
+ }
+
fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
mut decoder: Decoder,
mut input: S,
@@ -857,4 +873,363 @@ mod test {
.unwrap();
assert_eq!(&expected_uuid_array, uuid_array);
}
+
+ #[test]
+ fn test_nonnullable_impala() {
+ let file = arrow_test_data("avro/nonnullable.impala.avro");
+ let id = Int64Array::from(vec![Some(8)]);
+ let mut int_array_builder = ListBuilder::new(Int32Builder::new());
+ {
+ let vb = int_array_builder.values();
+ vb.append_value(-1);
+ }
+ int_array_builder.append(true); // finalize one sub-list
+ let int_array = int_array_builder.finish();
+ let mut iaa_builder =
ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+ {
+ let inner_list_builder = iaa_builder.values();
+ {
+ let vb = inner_list_builder.values();
+ vb.append_value(-1);
+ vb.append_value(-2);
+ }
+ inner_list_builder.append(true);
+ inner_list_builder.append(true);
+ }
+ iaa_builder.append(true);
+ let int_array_array = iaa_builder.finish();
+ use arrow_array::builder::MapFieldNames;
+ let field_names = MapFieldNames {
+ entry: "entries".to_string(),
+ key: "key".to_string(),
+ value: "value".to_string(),
+ };
+ let mut int_map_builder =
+ MapBuilder::new(Some(field_names), StringBuilder::new(),
Int32Builder::new());
+ {
+ let (keys, vals) = int_map_builder.entries();
+ keys.append_value("k1");
+ vals.append_value(-1);
+ }
+ int_map_builder.append(true).unwrap(); // finalize map for row 0
+ let int_map = int_map_builder.finish();
+ let field_names2 = MapFieldNames {
+ entry: "entries".to_string(),
+ key: "key".to_string(),
+ value: "value".to_string(),
+ };
+ let mut ima_builder = ListBuilder::new(MapBuilder::new(
+ Some(field_names2),
+ StringBuilder::new(),
+ Int32Builder::new(),
+ ));
+ {
+ let map_builder = ima_builder.values();
+ map_builder.append(true).unwrap();
+ {
+ let (keys, vals) = map_builder.entries();
+ keys.append_value("k1");
+ vals.append_value(1);
+ }
+ map_builder.append(true).unwrap();
+ map_builder.append(true).unwrap();
+ map_builder.append(true).unwrap();
+ }
+ ima_builder.append(true);
+ let int_map_array_ = ima_builder.finish();
+ let mut nested_sb = StructBuilder::new(
+ vec![
+ Arc::new(Field::new("a", DataType::Int32, true)),
+ Arc::new(Field::new(
+ "B",
+ DataType::List(Arc::new(Field::new("item",
DataType::Int32, true))),
+ true,
+ )),
+ Arc::new(Field::new(
+ "c",
+ DataType::Struct(
+ vec![Field::new(
+ "D",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Struct(
+ vec![
+ Field::new("e", DataType::Int32,
true),
+ Field::new("f", DataType::Utf8,
true),
+ ]
+ .into(),
+ ),
+ true,
+ ))),
+ true,
+ ))),
+ true,
+ )]
+ .into(),
+ ),
+ true,
+ )),
+ Arc::new(Field::new(
+ "G",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(
+ vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new(
+ "value",
+ DataType::Struct(
+ vec![Field::new(
+ "h",
+ DataType::Struct(
+ vec![Field::new(
+ "i",
+
DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Float64,
+ true,
+ ))),
+ true,
+ )]
+ .into(),
+ ),
+ true,
+ )]
+ .into(),
+ ),
+ true,
+ ),
+ ]
+ .into(),
+ ),
+ false,
+ )),
+ false,
+ ),
+ true,
+ )),
+ ],
+ vec![
+ Box::new(Int32Builder::new()),
+ Box::new(ListBuilder::new(Int32Builder::new())),
+ {
+ let d_field = Field::new(
+ "D",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Struct(
+ vec![
+ Field::new("e", DataType::Int32, true),
+ Field::new("f", DataType::Utf8, true),
+ ]
+ .into(),
+ ),
+ true,
+ ))),
+ true,
+ ))),
+ true,
+ );
+ Box::new(StructBuilder::new(
+ vec![Arc::new(d_field)],
+ vec![Box::new({
+ let ef_struct_builder = StructBuilder::new(
+ vec![
+ Arc::new(Field::new("e", DataType::Int32,
true)),
+ Arc::new(Field::new("f", DataType::Utf8,
true)),
+ ],
+ vec![
+ Box::new(Int32Builder::new()),
+ Box::new(StringBuilder::new()),
+ ],
+ );
+ let list_of_ef =
ListBuilder::new(ef_struct_builder);
+ ListBuilder::new(list_of_ef)
+ })],
+ ))
+ },
+ {
+ let map_field_names = MapFieldNames {
+ entry: "entries".to_string(),
+ key: "key".to_string(),
+ value: "value".to_string(),
+ };
+ let i_list_builder =
ListBuilder::new(Float64Builder::new());
+ let h_struct = StructBuilder::new(
+ vec![Arc::new(Field::new(
+ "i",
+ DataType::List(Arc::new(Field::new("item",
DataType::Float64, true))),
+ true,
+ ))],
+ vec![Box::new(i_list_builder)],
+ );
+ let g_value_builder = StructBuilder::new(
+ vec![Arc::new(Field::new(
+ "h",
+ DataType::Struct(
+ vec![Field::new(
+ "i",
+ DataType::List(Arc::new(Field::new(
+ "item",
+ DataType::Float64,
+ true,
+ ))),
+ true,
+ )]
+ .into(),
+ ),
+ true,
+ ))],
+ vec![Box::new(h_struct)],
+ );
+ Box::new(MapBuilder::new(
+ Some(map_field_names),
+ StringBuilder::new(),
+ g_value_builder,
+ ))
+ },
+ ],
+ );
+ nested_sb.append(true);
+ {
+ let a_builder =
nested_sb.field_builder::<Int32Builder>(0).unwrap();
+ a_builder.append_value(-1);
+ }
+ {
+ let b_builder = nested_sb
+ .field_builder::<ListBuilder<Int32Builder>>(1)
+ .unwrap();
+ {
+ let vb = b_builder.values();
+ vb.append_value(-1);
+ }
+ b_builder.append(true);
+ }
+ {
+ let c_struct_builder =
nested_sb.field_builder::<StructBuilder>(2).unwrap();
+ c_struct_builder.append(true);
+ let d_list_builder = c_struct_builder
+ .field_builder::<ListBuilder<ListBuilder<StructBuilder>>>(0)
+ .unwrap();
+ {
+ let sub_list_builder = d_list_builder.values();
+ {
+ let ef_struct = sub_list_builder.values();
+ ef_struct.append(true);
+ {
+ let e_b =
ef_struct.field_builder::<Int32Builder>(0).unwrap();
+ e_b.append_value(-1);
+ let f_b =
ef_struct.field_builder::<StringBuilder>(1).unwrap();
+ f_b.append_value("nonnullable");
+ }
+ sub_list_builder.append(true);
+ }
+ d_list_builder.append(true);
+ }
+ }
+ {
+ let g_map_builder = nested_sb
+ .field_builder::<MapBuilder<StringBuilder, StructBuilder>>(3)
+ .unwrap();
+ g_map_builder.append(true).unwrap();
+ }
+ let nested_struct = nested_sb.finish();
+ let expected = RecordBatch::try_from_iter_with_nullable([
+ ("ID", Arc::new(id) as Arc<dyn Array>, true),
+ ("Int_Array", Arc::new(int_array), true),
+ ("int_array_array", Arc::new(int_array_array), true),
+ ("Int_Map", Arc::new(int_map), true),
+ ("int_map_array", Arc::new(int_map_array_), true),
+ ("nested_Struct", Arc::new(nested_struct), true),
+ ])
+ .unwrap();
+ let batch_large = read_file(&file, 8, false);
+ assert_eq!(batch_large, expected, "Mismatch for batch_size=8");
+ let batch_small = read_file(&file, 3, false);
+ assert_eq!(batch_small, expected, "Mismatch for batch_size=3");
+ }
+
+ #[test]
+ fn test_nonnullable_impala_strict() {
+ let file = arrow_test_data("avro/nonnullable.impala.avro");
+ let err = read_file_strict(&file, 8, false).unwrap_err();
+ assert!(err.to_string().contains(
+ "Found Avro union of the form ['T','null'], which is disallowed in
strict_mode"
+ ));
+ }
+
+ #[test]
+ fn test_nullable_impala() {
+ let file = arrow_test_data("avro/nullable.impala.avro");
+ let batch1 = read_file(&file, 3, false);
+ let batch2 = read_file(&file, 8, false);
+ assert_eq!(batch1, batch2);
+ let batch = batch1;
+ assert_eq!(batch.num_rows(), 7);
+ let id_array = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("id column should be an Int64Array");
+ let expected_ids = [1, 2, 3, 4, 5, 6, 7];
+ for (i, &expected_id) in expected_ids.iter().enumerate() {
+ assert_eq!(id_array.value(i), expected_id, "Mismatch in id at row
{i}",);
+ }
+ let int_array = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .expect("int_array column should be a ListArray");
+ {
+ let offsets = int_array.value_offsets();
+ let start = offsets[0] as usize;
+ let end = offsets[1] as usize;
+ let values = int_array
+ .values()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Values of int_array should be an Int32Array");
+ let row0: Vec<Option<i32>> = (start..end).map(|i|
Some(values.value(i))).collect();
+ assert_eq!(
+ row0,
+ vec![Some(1), Some(2), Some(3)],
+ "Mismatch in int_array row 0"
+ );
+ }
+ let nested_struct = batch
+ .column(5)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .expect("nested_struct column should be a StructArray");
+ let a_array = nested_struct
+ .column_by_name("A")
+ .expect("Field A should exist in nested_struct")
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("Field A should be an Int32Array");
+ assert_eq!(a_array.value(0), 1, "Mismatch in nested_struct.A at row
0");
+ assert!(
+ !a_array.is_valid(1),
+ "Expected null in nested_struct.A at row 1"
+ );
+ assert!(
+ !a_array.is_valid(3),
+ "Expected null in nested_struct.A at row 3"
+ );
+ assert_eq!(a_array.value(6), 7, "Mismatch in nested_struct.A at row
6");
+ }
+
+ #[test]
+ fn test_nullable_impala_strict() {
+ let file = arrow_test_data("avro/nullable.impala.avro");
+ let err = read_file_strict(&file, 8, false).unwrap_err();
+ assert!(err.to_string().contains(
+ "Found Avro union of the form ['T','null'], which is disallowed in
strict_mode"
+ ));
+ }
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 2ef382a226..180afcd2d8 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -43,7 +43,6 @@ const DEFAULT_CAPACITY: usize = 1024;
pub(crate) struct RecordDecoderBuilder<'a> {
data_type: &'a AvroDataType,
use_utf8view: bool,
- strict_mode: bool,
}
impl<'a> RecordDecoderBuilder<'a> {
@@ -51,7 +50,6 @@ impl<'a> RecordDecoderBuilder<'a> {
Self {
data_type,
use_utf8view: false,
- strict_mode: false,
}
}
@@ -60,14 +58,9 @@ impl<'a> RecordDecoderBuilder<'a> {
self
}
- pub(crate) fn with_strict_mode(mut self, strict_mode: bool) -> Self {
- self.strict_mode = strict_mode;
- self
- }
-
/// Builds the `RecordDecoder`.
pub(crate) fn build(self) -> Result<RecordDecoder, ArrowError> {
- RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view,
self.strict_mode)
+ RecordDecoder::try_new_with_options(self.data_type, self.use_utf8view)
}
}
@@ -77,7 +70,6 @@ pub(crate) struct RecordDecoder {
schema: SchemaRef,
fields: Vec<Decoder>,
use_utf8view: bool,
- strict_mode: bool,
}
impl RecordDecoder {
@@ -90,7 +82,6 @@ impl RecordDecoder {
pub(crate) fn try_new(data_type: &AvroDataType) -> Result<Self,
ArrowError> {
RecordDecoderBuilder::new(data_type)
.with_utf8_view(true)
- .with_strict_mode(true)
.build()
}
@@ -109,14 +100,12 @@ impl RecordDecoder {
pub(crate) fn try_new_with_options(
data_type: &AvroDataType,
use_utf8view: bool,
- strict_mode: bool,
) -> Result<Self, ArrowError> {
match Decoder::try_new(data_type)? {
Decoder::Record(fields, encodings) => Ok(Self {
schema: Arc::new(ArrowSchema::new(fields)),
fields: encodings,
use_utf8view,
- strict_mode,
}),
encoding => Err(ArrowError::ParseError(format!(
"Expected record got {encoding:?}"
@@ -331,7 +320,6 @@ impl Decoder {
}
Self::Array(_, offsets, e) => {
offsets.push_length(0);
- e.append_null();
}
Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()),
Self::Map(_, _koff, moff, _, _) => {
@@ -344,7 +332,10 @@ impl Decoder {
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
Self::Enum(indices, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
- Self::Nullable(_, _, _) => unreachable!("Nulls cannot be nested"),
+ Self::Nullable(_, null_buffer, inner) => {
+ null_buffer.append(false);
+ inner.append_null();
+ }
}
}
@@ -431,12 +422,17 @@ impl Decoder {
let nanos = (millis as i64) * 1_000_000;
builder.append_value(IntervalMonthDayNano::new(months as i32,
days as i32, nanos));
}
- Self::Nullable(nullability, nulls, e) => {
- let is_valid = buf.get_bool()? == matches!(nullability,
Nullability::NullFirst);
- nulls.append(is_valid);
- match is_valid {
- true => e.decode(buf)?,
- false => e.append_null(),
+ Self::Nullable(order, nb, encoding) => {
+ let branch = buf.read_vlq()?;
+ let is_not_null = match *order {
+ Nullability::NullFirst => branch != 0,
+ Nullability::NullSecond => branch == 0,
+ };
+ nb.append(is_not_null);
+ if is_not_null {
+ encoding.decode(buf)?;
+ } else {
+ encoding.append_null();
}
}
}