This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new ae934888bb fix: resolution of complex type variants in Avro unions
(#9328)
ae934888bb is described below
commit ae934888bb87196d272340bc528e93dd516bc9e6
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Fri Feb 27 20:09:41 2026 +0200
fix: resolution of complex type variants in Avro unions (#9328)
# Which issue does this PR close?
- Closes #9336
# Rationale for this change
When an Avro reader schema has a union type that needs to be resolved
against the type in the writer schema, resolution information other than
primitive type promotions is not properly handled when creating the
decoder.
For example, when the reader schema has a nullable record field that has
an added nested field on top of the fields defined in the writer schema,
the record type resolution needs to be applied, using a projection with
the default field value.
# What changes are included in this PR?
Extend the union resolution information in the decoder with variant
data for enum remapping and record projection. The `Projector` data
structure with `Skipper` decoders makes part of this information,
which necessitated some refactoring.
# Are these changes tested?
TODO:
- [x] Debug failing tests including a busy-loop failure mode.
- [ ] Add more unit tests exercising the complex resolutions.
# Are there any user-facing changes?
No.
---
arrow-avro/src/codec.rs | 415 +++++++++++++++++++------
arrow-avro/src/reader/mod.rs | 361 ++++++++++++++++++++--
arrow-avro/src/reader/record.rs | 661 +++++++++++++++++++++++++---------------
arrow-avro/src/schema.rs | 28 +-
4 files changed, 1097 insertions(+), 368 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index d54c6602da..d20a71425d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -141,7 +141,7 @@ impl Display for Promotion {
pub(crate) struct ResolvedUnion {
/// For each writer branch index, the reader branch index and how to read
it.
/// `None` means the writer branch doesn't resolve against the reader.
- pub(crate) writer_to_reader: Arc<[Option<(usize, Promotion)>]>,
+ pub(crate) writer_to_reader: Arc<[Option<(usize, ResolutionInfo)>]>,
/// Whether the writer schema at this site is a union
pub(crate) writer_is_union: bool,
/// Whether the reader schema at this site is a union
@@ -1748,9 +1748,21 @@ impl<'a> Maker<'a> {
nullable_union_variants(writer_variants),
nullable_union_variants(reader_variants),
) {
- (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
- let mut dt = self.make_data_type(w_nonnull,
Some(r_nonnull), namespace)?;
+ (Some((w_nb, w_nonnull)), Some((r_nb, r_nonnull))) => {
+ let mut dt = self.resolve_type(w_nonnull, r_nonnull,
namespace)?;
+ let mut writer_to_reader = vec![None, None];
+ writer_to_reader[w_nb.non_null_index()] = Some((
+ r_nb.non_null_index(),
+ dt.resolution
+ .take()
+
.unwrap_or(ResolutionInfo::Promotion(Promotion::Direct)),
+ ));
dt.nullability = Some(w_nb);
+ dt.resolution =
Some(ResolutionInfo::Union(ResolvedUnion {
+ writer_to_reader: Arc::from(writer_to_reader),
+ writer_is_union: true,
+ reader_is_union: true,
+ }));
#[cfg(feature = "avro_custom_types")]
Self::propagate_nullability_into_ree(&mut dt, w_nb);
Ok(dt)
@@ -1759,12 +1771,17 @@ impl<'a> Maker<'a> {
}
}
(Schema::Union(writer_variants), reader_non_union) => {
- let writer_to_reader: Vec<Option<(usize, Promotion)>> =
writer_variants
+ let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> =
writer_variants
.iter()
.map(|writer| {
self.resolve_type(writer, reader_non_union, namespace)
.ok()
- .map(|tmp| (0usize, Self::coercion_from(&tmp)))
+ .map(|tmp| {
+ let resolution = tmp
+ .resolution
+
.unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
+ (0usize, resolution)
+ })
})
.collect();
let mut dt = self.parse_type(reader_non_union, namespace)?;
@@ -1780,54 +1797,44 @@ impl<'a> Maker<'a> {
nullable_union_variants(reader_variants)
{
let mut dt = self.resolve_type(writer_non_union,
non_null_branch, namespace)?;
- let non_null_idx = match nullability {
- Nullability::NullFirst => 1,
- Nullability::NullSecond => 0,
- };
#[cfg(feature = "avro_custom_types")]
Self::propagate_nullability_into_ree(&mut dt, nullability);
dt.nullability = Some(nullability);
- let promotion = Self::coercion_from(&dt);
- dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
- writer_to_reader: Arc::from(vec![Some((non_null_idx,
promotion))]),
- writer_is_union: false,
- reader_is_union: true,
- }));
+ // Ensure resolution is set to a non-Union variant to
suppress
+ // reading the union tag which is the default behavior.
+ if dt.resolution.is_none() {
+ dt.resolution =
Some(ResolutionInfo::Promotion(Promotion::Direct));
+ }
Ok(dt)
} else {
- let mut best_match: Option<(usize, AvroDataType,
Promotion)> = None;
- for (i, variant) in reader_variants.iter().enumerate() {
- if let Ok(resolved_dt) =
- self.resolve_type(writer_non_union, variant,
namespace)
- {
- let promotion = Self::coercion_from(&resolved_dt);
- if promotion == Promotion::Direct {
- best_match = Some((i, resolved_dt, promotion));
- break;
- } else if best_match.is_none() {
- best_match = Some((i, resolved_dt, promotion));
- }
- }
- }
- let Some((match_idx, match_dt, promotion)) = best_match
else {
+ let Some((match_idx, mut match_dt)) =
+ self.find_best_union_match(writer_non_union,
reader_variants, namespace)
+ else {
return Err(ArrowError::SchemaError(
"Writer schema does not match any reader union
branch".to_string(),
));
};
- let mut children =
Vec::with_capacity(reader_variants.len());
+ // Steal the resolution info from the matching reader
branch
+ // for the Union resolution, but preserve possible
resolution
+ // information on its inner types.
+ // For other branches, resolution is irrelevant,
+ // so just parse them.
+ let resolution = match_dt
+ .resolution
+ .take()
+
.unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
let mut match_dt = Some(match_dt);
- for (i, variant) in reader_variants.iter().enumerate() {
- if i == match_idx {
- if let Some(mut dt) = match_dt.take() {
- if matches!(dt.resolution,
Some(ResolutionInfo::Promotion(_))) {
- dt.resolution = None;
- }
- children.push(dt);
+ let children = reader_variants
+ .iter()
+ .enumerate()
+ .map(|(idx, variant)| {
+ if idx == match_idx {
+ Ok(match_dt.take().unwrap())
+ } else {
+ self.parse_type(variant, namespace)
}
- } else {
- children.push(self.parse_type(variant,
namespace)?);
- }
- }
+ })
+ .collect::<Result<Vec<_>, _>>()?;
let union_fields = build_union_fields(&children)?;
let mut dt = AvroDataType::new(
Codec::Union(children.into(), union_fields,
UnionMode::Dense),
@@ -1835,7 +1842,7 @@ impl<'a> Maker<'a> {
None,
);
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
- writer_to_reader: Arc::from(vec![Some((match_idx,
promotion))]),
+ writer_to_reader: Arc::from(vec![Some((match_idx,
resolution))]),
writer_is_union: false,
reader_is_union: true,
}));
@@ -1870,34 +1877,30 @@ impl<'a> Maker<'a> {
}
}
- #[inline]
- fn coercion_from(dt: &AvroDataType) -> Promotion {
- match dt.resolution.as_ref() {
- Some(ResolutionInfo::Promotion(promotion)) => *promotion,
- _ => Promotion::Direct,
- }
- }
-
- fn find_best_promotion(
+ fn find_best_union_match(
&mut self,
writer: &Schema<'a>,
reader_variants: &[Schema<'a>],
namespace: Option<&'a str>,
- ) -> Option<(usize, Promotion)> {
- let mut first_promotion: Option<(usize, Promotion)> = None;
+ ) -> Option<(usize, AvroDataType)> {
+ let mut first_resolution = None;
for (reader_index, reader) in reader_variants.iter().enumerate() {
- if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
- let promotion = Self::coercion_from(&tmp);
- if promotion == Promotion::Direct {
- // An exact match is best, return immediately.
- return Some((reader_index, promotion));
- } else if first_promotion.is_none() {
- // Store the first valid promotion but keep searching for
a direct match.
- first_promotion = Some((reader_index, promotion));
- }
+ if let Ok(dt) = self.resolve_type(writer, reader, namespace) {
+ match &dt.resolution {
+ None | Some(ResolutionInfo::Promotion(Promotion::Direct))
=> {
+ // An exact match is best, return immediately.
+ return Some((reader_index, dt));
+ }
+ Some(_) => {
+ if first_resolution.is_none() {
+ // Store the first valid promotion but keep
searching for a direct match.
+ first_resolution = Some((reader_index, dt));
+ }
+ }
+ };
}
}
- first_promotion
+ first_resolution
}
fn resolve_unions<'s>(
@@ -1906,15 +1909,34 @@ impl<'a> Maker<'a> {
reader_variants: &'s [Schema<'a>],
namespace: Option<&'a str>,
) -> Result<AvroDataType, ArrowError> {
+ let mut resolved_reader_encodings = HashMap::new();
+ let writer_to_reader: Vec<Option<(usize, ResolutionInfo)>> =
writer_variants
+ .iter()
+ .map(|writer| {
+ self.find_best_union_match(writer, reader_variants, namespace)
+ .map(|(match_idx, mut match_dt)| {
+ let resolution = match_dt
+ .resolution
+ .take()
+
.unwrap_or(ResolutionInfo::Promotion(Promotion::Direct));
+ // TODO: check for overlapping reader variants?
+ // They should not be possible in a valid schema.
+ resolved_reader_encodings.insert(match_idx, match_dt);
+ (match_idx, resolution)
+ })
+ })
+ .collect();
let reader_encodings: Vec<AvroDataType> = reader_variants
.iter()
- .map(|reader_schema| self.parse_type(reader_schema, namespace))
+ .enumerate()
+ .map(|(reader_idx, reader_schema)| {
+ if let Some(resolved) =
resolved_reader_encodings.remove(&reader_idx) {
+ Ok(resolved)
+ } else {
+ self.parse_type(reader_schema, namespace)
+ }
+ })
.collect::<Result<_, _>>()?;
- let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
- Vec::with_capacity(writer_variants.len());
- for writer in writer_variants {
- writer_to_reader.push(self.find_best_promotion(writer,
reader_variants, namespace));
- }
let union_fields = build_union_fields(&reader_encodings)?;
let mut dt = AvroDataType::new(
Codec::Union(reader_encodings.into(), union_fields,
UnionMode::Dense),
@@ -2179,7 +2201,14 @@ impl<'a> Maker<'a> {
)?;
let writer_ns = writer_record.namespace.or(namespace);
let reader_ns = reader_record.namespace.or(namespace);
- let reader_md = reader_record.attributes.field_metadata();
+ let mut reader_md = reader_record.attributes.field_metadata();
+ reader_md.insert(
+ AVRO_NAME_METADATA_KEY.to_string(),
+ reader_record.name.to_string(),
+ );
+ if let Some(ns) = reader_ns {
+ reader_md.insert(AVRO_NAMESPACE_METADATA_KEY.to_string(),
ns.to_string());
+ }
// Build writer lookup and ambiguous alias set.
let (writer_lookup, ambiguous_writer_aliases) =
Self::build_writer_lookup(writer_record);
let mut writer_to_reader: Vec<Option<usize>> = vec![None;
writer_record.fields.len()];
@@ -2620,7 +2649,15 @@ mod tests {
assert!(matches!(result.codec, Codec::Float64));
assert_eq!(
result.resolution,
- Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
+ Some(ResolutionInfo::Union(ResolvedUnion {
+ writer_to_reader: [
+ None,
+ Some((0,
ResolutionInfo::Promotion(Promotion::IntToDouble)))
+ ]
+ .into(),
+ writer_is_union: true,
+ reader_is_union: true,
+ }))
);
assert_eq!(result.nullability, Some(Nullability::NullFirst));
}
@@ -2642,7 +2679,10 @@ mod tests {
assert!(resolved.writer_is_union && !resolved.reader_is_union);
assert_eq!(
resolved.writer_to_reader.as_ref(),
- &[Some((0, Promotion::StringToBytes)), None]
+ &[
+ Some((0, ResolutionInfo::Promotion(Promotion::StringToBytes))),
+ None
+ ]
);
}
@@ -2662,7 +2702,7 @@ mod tests {
assert!(!resolved.writer_is_union && resolved.reader_is_union);
assert_eq!(
resolved.writer_to_reader.as_ref(),
- &[Some((0, Promotion::Direct))]
+ &[Some((0, ResolutionInfo::Promotion(Promotion::Direct)))]
);
}
@@ -2682,7 +2722,200 @@ mod tests {
};
assert_eq!(
resolved.writer_to_reader.as_ref(),
- &[Some((1, Promotion::IntToLong))]
+ &[Some((1, ResolutionInfo::Promotion(Promotion::IntToLong)))]
+ );
+ }
+
+ #[test]
+ fn
test_resolve_writer_non_union_to_reader_union_preserves_inner_record_defaults()
{
+ // Writer: record Inner{a: int}
+ // Reader: union [Inner{a: int, b: int default 42}, string]
+ // The matching child (Inner) should preserve DefaultValue(Int(42)) on
field b.
+ let writer = Schema::Complex(ComplexType::Record(Record {
+ name: "Inner",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![AvroFieldSchema {
+ name: "a",
+ doc: None,
+ r#type: mk_primitive(PrimitiveType::Int),
+ default: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ }));
+ let reader = mk_union(vec![
+ Schema::Complex(ComplexType::Record(Record {
+ name: "Inner",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![
+ AvroFieldSchema {
+ name: "a",
+ doc: None,
+ r#type: mk_primitive(PrimitiveType::Int),
+ default: None,
+ aliases: vec![],
+ },
+ AvroFieldSchema {
+ name: "b",
+ doc: None,
+ r#type: mk_primitive(PrimitiveType::Int),
+ default:
Some(Value::Number(serde_json::Number::from(42))),
+ aliases: vec![],
+ },
+ ],
+ attributes: Attributes::default(),
+ })),
+ mk_primitive(PrimitiveType::String),
+ ]);
+ let mut maker = Maker::new(false, false);
+ let dt = maker
+ .make_data_type(&writer, Some(&reader), None)
+ .expect("resolution should succeed");
+ // Verify the union resolution structure
+ let resolved = match dt.resolution.as_ref() {
+ Some(ResolutionInfo::Union(u)) => u,
+ other => panic!("expected union resolution info, got {other:?}"),
+ };
+ assert!(!resolved.writer_is_union && resolved.reader_is_union);
+ assert_eq!(
+ resolved.writer_to_reader.len(),
+ 1,
+ "expected the non-union record to resolve to a union variant"
+ );
+ let resolution = match resolved.writer_to_reader.first().unwrap() {
+ Some((0, resolution)) => resolution,
+ other => panic!("unexpected writer-to-reader table value
{other:?}"),
+ };
+ match resolution {
+ ResolutionInfo::Record(ResolvedRecord {
+ writer_to_reader,
+ default_fields,
+ skip_fields,
+ }) => {
+ assert_eq!(writer_to_reader.len(), 1);
+ assert_eq!(writer_to_reader[0], Some(0));
+ assert_eq!(default_fields.len(), 1);
+ assert_eq!(default_fields[0], 1);
+ assert_eq!(skip_fields.len(), 1);
+ assert_eq!(skip_fields[0], None);
+ }
+ other => panic!("unexpected resolution {other:?}"),
+ }
+ // The matching child (Inner at index 0) should have field b with
DefaultValue
+ let children = match dt.codec() {
+ Codec::Union(children, _, _) => children,
+ other => panic!("expected union codec, got {other:?}"),
+ };
+ let inner_fields = match children[0].codec() {
+ Codec::Struct(f) => f,
+ other => panic!("expected struct codec for Inner, got {other:?}"),
+ };
+ assert_eq!(inner_fields.len(), 2);
+ assert_eq!(inner_fields[1].name(), "b");
+ assert_eq!(
+ inner_fields[1].data_type().resolution,
+ Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
+ "field b should have DefaultValue(Int(42)) from schema resolution"
+ );
+ }
+
+ #[test]
+ fn
test_resolve_writer_union_to_reader_union_preserves_inner_record_defaults() {
+ // Writer: record [string, Inner{a: int}]
+ // Reader: union [Inner{a: int, b: int default 42}, string]
+ // The matching child (Inner) should preserve DefaultValue(Int(42)) on
field b.
+ let writer = mk_union(vec![
+ mk_primitive(PrimitiveType::String),
+ Schema::Complex(ComplexType::Record(Record {
+ name: "Inner",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![AvroFieldSchema {
+ name: "a",
+ doc: None,
+ r#type: mk_primitive(PrimitiveType::Int),
+ default: None,
+ aliases: vec![],
+ }],
+ attributes: Attributes::default(),
+ })),
+ ]);
+ let reader = mk_union(vec![
+ Schema::Complex(ComplexType::Record(Record {
+ name: "Inner",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![
+ AvroFieldSchema {
+ name: "a",
+ doc: None,
+ r#type: mk_primitive(PrimitiveType::Int),
+ default: None,
+ aliases: vec![],
+ },
+ AvroFieldSchema {
+ name: "b",
+ doc: None,
+ r#type: mk_primitive(PrimitiveType::Int),
+ default:
Some(Value::Number(serde_json::Number::from(42))),
+ aliases: vec![],
+ },
+ ],
+ attributes: Attributes::default(),
+ })),
+ mk_primitive(PrimitiveType::String),
+ ]);
+ let mut maker = Maker::new(false, false);
+ let dt = maker
+ .make_data_type(&writer, Some(&reader), None)
+ .expect("resolution should succeed");
+ // Verify the union resolution structure
+ let resolved = match dt.resolution.as_ref() {
+ Some(ResolutionInfo::Union(u)) => u,
+ other => panic!("expected union resolution info, got {other:?}"),
+ };
+ assert!(resolved.writer_is_union && resolved.reader_is_union);
+ assert_eq!(resolved.writer_to_reader.len(), 2);
+ let resolution = match resolved.writer_to_reader[1].as_ref() {
+ Some((0, resolution)) => resolution,
+ other => panic!("unexpected writer-to-reader table value
{other:?}"),
+ };
+ match resolution {
+ ResolutionInfo::Record(ResolvedRecord {
+ writer_to_reader,
+ default_fields,
+ skip_fields,
+ }) => {
+ assert_eq!(writer_to_reader.len(), 1);
+ assert_eq!(writer_to_reader[0], Some(0));
+ assert_eq!(default_fields.len(), 1);
+ assert_eq!(default_fields[0], 1);
+ assert_eq!(skip_fields.len(), 1);
+ assert_eq!(skip_fields[0], None);
+ }
+ other => panic!("unexpected resolution {other:?}"),
+ }
+ // The matching child (Inner at index 0) should have field b with
DefaultValue
+ let children = match dt.codec() {
+ Codec::Union(children, _, _) => children,
+ other => panic!("expected union codec, got {other:?}"),
+ };
+ let inner_fields = match children[0].codec() {
+ Codec::Struct(f) => f,
+ other => panic!("expected struct codec for Inner, got {other:?}"),
+ };
+ assert_eq!(inner_fields.len(), 2);
+ assert_eq!(inner_fields[1].name(), "b");
+ assert_eq!(
+ inner_fields[1].data_type().resolution,
+ Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
+ "field b should have DefaultValue(Int(42)) from schema resolution"
);
}
@@ -2700,7 +2933,18 @@ mod tests {
let dt = maker.make_data_type(&writer, Some(&reader), None).unwrap();
assert!(matches!(dt.codec(), Codec::Utf8));
assert_eq!(dt.nullability, Some(Nullability::NullFirst));
- assert!(dt.resolution.is_none());
+ assert_eq!(
+ dt.resolution,
+ Some(ResolutionInfo::Union(ResolvedUnion {
+ writer_to_reader: [
+ None,
+ Some((0, ResolutionInfo::Promotion(Promotion::Direct)))
+ ]
+ .into(),
+ writer_is_union: true,
+ reader_is_union: true
+ }))
+ );
}
#[test]
@@ -2719,7 +2963,15 @@ mod tests {
assert_eq!(dt.nullability, Some(Nullability::NullFirst));
assert_eq!(
dt.resolution,
- Some(ResolutionInfo::Promotion(Promotion::IntToDouble))
+ Some(ResolutionInfo::Union(ResolvedUnion {
+ writer_to_reader: [
+ None,
+ Some((0,
ResolutionInfo::Promotion(Promotion::IntToDouble)))
+ ]
+ .into(),
+ writer_is_union: true,
+ reader_is_union: true
+ }))
);
}
@@ -3316,14 +3568,7 @@ mod tests {
assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
assert!(matches!(inner.codec(), Codec::Int32));
match inner.resolution.as_ref() {
- Some(ResolutionInfo::Union(info)) => {
- assert!(!info.writer_is_union, "writer should be
non-union");
- assert!(info.reader_is_union, "reader should be union");
- assert_eq!(
- info.writer_to_reader.as_ref(),
- &[Some((1, Promotion::Direct))]
- );
- }
+ Some(ResolutionInfo::Promotion(Promotion::Direct)) => {}
other => panic!("expected Union resolution, got {other:?}"),
}
} else {
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index aa01f272bf..63b61b601e 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -6866,6 +6866,264 @@ mod test {
assert_eq!(int_values.value(1), 2);
}
+ #[test]
+ fn test_nested_record_field_addition() {
+ let file = arrow_test_data("avro/nested_records.avro");
+
+ // Adds fields to the writer schema:
+ // * "ns2.record2" / "f1_4"
+ // - nullable
+ // - added last
+ // - the containing "f1" field is made nullable in the reader
+ // * "ns4.record4" / "f2_3"
+ // - non-nullable with an integer default value
+ // - resolution of a record nested in an array
+ // * "ns5.record5" / "f3_0"
+ // - non-nullable with a string default value
+ // - prepended before existing fields in the schema order
+ let reader_schema = AvroSchema::new(
+ r#"
+ {
+ "type": "record",
+ "name": "record1",
+ "namespace": "ns1",
+ "fields": [
+ {
+ "name": "f1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "record2",
+ "namespace": "ns2",
+ "fields": [
+ {
+ "name": "f1_1",
+ "type": "string"
+ },
+ {
+ "name": "f1_2",
+ "type": "int"
+ },
+ {
+ "name": "f1_3",
+ "type": {
+ "type": "record",
+ "name": "record3",
+ "namespace": "ns3",
+ "fields": [
+ {
+ "name": "f1_3_1",
+ "type": "double"
+ }
+ ]
+ }
+ },
+ {
+ "name": "f1_4",
+ "type": ["null", "int"],
+ "default": null
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "f2",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "record4",
+ "namespace": "ns4",
+ "fields": [
+ {
+ "name": "f2_1",
+ "type": "boolean"
+ },
+ {
+ "name": "f2_2",
+ "type": "float"
+ },
+ {
+ "name": "f2_3",
+ "type": ["null", "int"],
+ "default": 42
+ }
+ ]
+ }
+ }
+ },
+ {
+ "name": "f3",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "record5",
+ "namespace": "ns5",
+ "fields": [
+ {
+ "name": "f3_0",
+ "type": "string",
+ "default": "lorem ipsum"
+ },
+ {
+ "name": "f3_1",
+ "type": "string"
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "f4",
+ "type": {
+ "type": "array",
+ "items": [
+ "null",
+ {
+ "type": "record",
+ "name": "record6",
+ "namespace": "ns6",
+ "fields": [
+ {
+ "name": "f4_1",
+ "type": "long"
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ]
+ }
+ "#
+ .to_string(),
+ );
+
+ let file = File::open(&file).unwrap();
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(reader_schema)
+ .build(BufReader::new(file))
+ .expect("reader with evolved reader schema should be built
successfully");
+
+ let batch = reader
+ .next()
+ .expect("should have at least one batch")
+ .expect("reading should succeed");
+
+ assert!(batch.num_rows() > 0);
+
+ let schema = batch.schema();
+
+ let f1_field = schema.field_with_name("f1").expect("f1 field should
exist");
+ if let DataType::Struct(f1_fields) = f1_field.data_type() {
+ let (_, f1_4) = f1_fields
+ .find("f1_4")
+ .expect("f1_4 field should be present in record2");
+ assert!(f1_4.is_nullable(), "f1_4 should be nullable");
+ assert_eq!(f1_4.data_type(), &DataType::Int32, "f1_4 should be
Int32");
+ assert_eq!(
+ f1_4.metadata().get("avro.field.default"),
+ Some(&"null".to_string()),
+ "f1_4 should have null default value in metadata"
+ );
+ } else {
+ panic!("f1 should be a struct");
+ }
+
+ let f2_field = schema.field_with_name("f2").expect("f2 field should
exist");
+ if let DataType::List(f2_items_field) = f2_field.data_type() {
+ if let DataType::Struct(f2_items_fields) =
f2_items_field.data_type() {
+ let (_, f2_3) = f2_items_fields
+ .find("f2_3")
+ .expect("f2_3 field should be present in record4");
+ assert!(f2_3.is_nullable(), "f2_3 should be nullable");
+ assert_eq!(f2_3.data_type(), &DataType::Int32, "f2_3 should be
Int32");
+ assert_eq!(
+ f2_3.metadata().get("avro.field.default"),
+ Some(&"42".to_string()),
+ "f2_3 should have 42 default value in metadata"
+ );
+ } else {
+ panic!("f2 array items should be a struct");
+ }
+ } else {
+ panic!("f2 should be a list");
+ }
+
+ let f3_field = schema.field_with_name("f3").expect("f3 field should
exist");
+ assert!(f3_field.is_nullable(), "f3 should be nullable");
+ if let DataType::Struct(f3_fields) = f3_field.data_type() {
+ let (_, f3_0) = f3_fields
+ .find("f3_0")
+ .expect("f3_0 field should be present in record5");
+ assert!(!f3_0.is_nullable(), "f3_0 should be non-nullable");
+ assert_eq!(f3_0.data_type(), &DataType::Utf8, "f3_0 should be a
string");
+ assert_eq!(
+ f3_0.metadata().get("avro.field.default"),
+ Some(&"\"lorem ipsum\"".to_string()),
+ "f3_0 should have \"lorem ipsum\" default value in metadata"
+ );
+ } else {
+ panic!("f3 should be a struct");
+ }
+
+ // Verify the actual values in the columns match the expected defaults
+ let num_rows = batch.num_rows();
+
+ // Check f1_4 values (should all be null since default is null)
+ let f1_array = batch
+ .column_by_name("f1")
+ .expect("f1 column should exist")
+ .as_struct();
+ let f1_4_array = f1_array
+ .column_by_name("f1_4")
+ .expect("f1_4 column should exist in f1 struct")
+ .as_primitive::<Int32Type>();
+
+ assert_eq!(f1_4_array.null_count(), num_rows);
+
+ let f2_array = batch
+ .column_by_name("f2")
+ .expect("f2 column should exist")
+ .as_list::<i32>();
+
+ for i in 0..num_rows {
+ assert!(!f2_array.is_null(i));
+ let f2_value = f2_array.value(i);
+ let f2_record_array = f2_value.as_struct();
+ let f2_3_array = f2_record_array
+ .column_by_name("f2_3")
+ .expect("f2_3 column should exist in f2 array items")
+ .as_primitive::<Int32Type>();
+
+ for j in 0..f2_3_array.len() {
+ assert!(!f2_3_array.is_null(j));
+ assert_eq!(f2_3_array.value(j), 42);
+ }
+ }
+
+ let f3_array = batch
+ .column_by_name("f3")
+ .expect("f3 column should exist")
+ .as_struct();
+ let f3_0_array = f3_array
+ .column_by_name("f3_0")
+ .expect("f3_0 column should exist in f3 struct")
+ .as_string::<i32>();
+
+ for i in 0..num_rows {
+ // Only check f3_0 when the parent f3 struct is not null
+ if !f3_array.is_null(i) {
+ assert!(!f3_0_array.is_null(i));
+ assert_eq!(f3_0_array.value(i), "lorem ipsum");
+ }
+ }
+ }
+
fn corrupt_first_block_payload_byte(
mut bytes: Vec<u8>,
field_offset: usize,
@@ -8441,6 +8699,33 @@ mod test {
])),
false,
));
+ let person_md = {
+ let mut m = HashMap::<String, String>::new();
+ m.insert(AVRO_NAME_METADATA_KEY.to_string(), "Person".to_string());
+ m.insert(
+ AVRO_NAMESPACE_METADATA_KEY.to_string(),
+ "com.example".to_string(),
+ );
+ m
+ };
+ let maybe_auth_md = {
+ let mut m = HashMap::<String, String>::new();
+ m.insert(AVRO_NAME_METADATA_KEY.to_string(),
"MaybeAuth".to_string());
+ m.insert(
+ AVRO_NAMESPACE_METADATA_KEY.to_string(),
+ "org.apache.arrow.avrotests.v1.types".to_string(),
+ );
+ m
+ };
+ let address_md = {
+ let mut m = HashMap::<String, String>::new();
+ m.insert(AVRO_NAME_METADATA_KEY.to_string(),
"Address".to_string());
+ m.insert(
+ AVRO_NAMESPACE_METADATA_KEY.to_string(),
+ "org.apache.arrow.avrotests.v1.types".to_string(),
+ );
+ m
+ };
let rec_a_md = {
let mut m = HashMap::<String, String>::new();
m.insert(AVRO_NAME_METADATA_KEY.to_string(), "RecA".to_string());
@@ -8576,11 +8861,18 @@ mod test {
true,
),
]);
- let kv_item_field = Arc::new(Field::new(
- item_name,
- DataType::Struct(kv_fields.clone()),
- false,
- ));
+ let kv_md = {
+ let mut m = HashMap::<String, String>::new();
+ m.insert(AVRO_NAME_METADATA_KEY.to_string(), "KV".to_string());
+ m.insert(
+ AVRO_NAMESPACE_METADATA_KEY.to_string(),
+ "org.apache.arrow.avrotests.v1.types".to_string(),
+ );
+ m
+ };
+ let kv_item_field = Arc::new(
+ Field::new(item_name, DataType::Struct(kv_fields.clone()),
false).with_metadata(kv_md),
+ );
let map_int_entries = Arc::new(Field::new(
"entries",
DataType::Struct(Fields::from(vec![
@@ -8652,14 +8944,17 @@ mod test {
#[cfg(not(feature = "small_decimals"))]
let dec10_dt = DataType::Decimal128(10, 2);
let fields: Vec<FieldRef> = vec![
- Arc::new(Field::new(
- "person",
- DataType::Struct(Fields::from(vec![
- Field::new("name", DataType::Utf8, false),
- Field::new("age", DataType::Int32, false),
- ])),
- false,
- )),
+ Arc::new(
+ Field::new(
+ "person",
+ DataType::Struct(Fields::from(vec![
+ Field::new("name", DataType::Utf8, false),
+ Field::new("age", DataType::Int32, false),
+ ])),
+ false,
+ )
+ .with_metadata(person_md),
+ ),
Arc::new(Field::new("old_count", DataType::Int32, false)),
Arc::new(Field::new(
"union_map_or_array_int",
@@ -8691,23 +8986,29 @@ mod test {
DataType::Union(uf_union_big.clone(), UnionMode::Dense),
false,
)),
- Arc::new(Field::new(
- "maybe_auth",
- DataType::Struct(Fields::from(vec![
- Field::new("user", DataType::Utf8, false),
- Field::new("token", DataType::Binary, true), //
[bytes,null] -> nullable bytes
- ])),
- false,
- )),
- Arc::new(Field::new(
- "address",
- DataType::Struct(Fields::from(vec![
- Field::new("street_name", DataType::Utf8, false),
- Field::new("zip", DataType::Int32, false),
- Field::new("country", DataType::Utf8, false),
- ])),
- false,
- )),
+ Arc::new(
+ Field::new(
+ "maybe_auth",
+ DataType::Struct(Fields::from(vec![
+ Field::new("user", DataType::Utf8, false),
+ Field::new("token", DataType::Binary, true), //
[bytes,null] -> nullable bytes
+ ])),
+ false,
+ )
+ .with_metadata(maybe_auth_md),
+ ),
+ Arc::new(
+ Field::new(
+ "address",
+ DataType::Struct(Fields::from(vec![
+ Field::new("street_name", DataType::Utf8, false),
+ Field::new("zip", DataType::Int32, false),
+ Field::new("country", DataType::Utf8, false),
+ ])),
+ false,
+ )
+ .with_metadata(address_md),
+ ),
Arc::new(Field::new(
"map_union",
DataType::Map(map_entries_field.clone(), false),
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 7701eeea72..5e281d1fc6 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -18,7 +18,7 @@
//! Avro Decoder for Arrow types.
use crate::codec::{
- AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo,
ResolvedRecord,
+ AvroDataType, AvroLiteral, Codec, EnumMapping, Promotion, ResolutionInfo,
ResolvedRecord,
ResolvedUnion,
};
use crate::errors::AvroError;
@@ -38,22 +38,14 @@ use arrow_schema::{
};
#[cfg(feature = "avro_custom_types")]
use arrow_select::take::{TakeOptions, take};
-use std::cmp::Ordering;
-use std::sync::Arc;
use strum_macros::AsRefStr;
use uuid::Uuid;
-const DEFAULT_CAPACITY: usize = 1024;
+use std::cmp::Ordering;
+use std::mem;
+use std::sync::Arc;
-/// Runtime plan for decoding reader-side `["null", T]` types.
-#[derive(Clone, Copy, Debug)]
-enum NullablePlan {
- /// Writer actually wrote a union (branch tag present).
- ReadTag,
- /// Writer wrote a single (non-union) value resolved to the non-null branch
- /// of the reader union; do NOT read a branch tag, but apply any promotion.
- FromSingle { promotion: Promotion },
-}
+const DEFAULT_CAPACITY: usize = 1024;
/// Macro to decode a decimal payload for a given width and integer type.
macro_rules! decode_decimal {
@@ -121,13 +113,22 @@ impl RecordDecoder {
// Build Arrow schema fields and per-child decoders
let mut arrow_fields = Vec::with_capacity(reader_fields.len());
let mut encodings = Vec::with_capacity(reader_fields.len());
+ let mut field_defaults =
Vec::with_capacity(reader_fields.len());
for avro_field in reader_fields.iter() {
arrow_fields.push(avro_field.field());
encodings.push(Decoder::try_new(avro_field.data_type())?);
+
+ if let Some(ResolutionInfo::DefaultValue(lit)) =
+ avro_field.data_type().resolution.as_ref()
+ {
+ field_defaults.push(Some(lit.clone()));
+ } else {
+ field_defaults.push(None);
+ }
}
let projector = match data_type.resolution.as_ref() {
Some(ResolutionInfo::Record(rec)) => {
- Some(ProjectorBuilder::try_new(rec,
reader_fields).build()?)
+ Some(ProjectorBuilder::try_new(rec,
&field_defaults).build()?)
}
_ => None,
};
@@ -179,12 +180,6 @@ impl RecordDecoder {
}
}
-#[derive(Debug)]
-struct EnumResolution {
- mapping: Arc<[i32]>,
- default_index: i32,
-}
-
#[derive(Debug, AsRefStr)]
enum Decoder {
Null(usize),
@@ -249,7 +244,12 @@ enum Decoder {
/// String data encoded as UTF-8 bytes, but mapped to Arrow's
StringViewArray
StringView(OffsetBufferBuilder<i32>, Vec<u8>),
Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
- Record(Fields, Vec<Decoder>, Option<Projector>),
+ Record(
+ Fields,
+ Vec<Decoder>,
+ Vec<Option<AvroLiteral>>,
+ Option<Projector>,
+ ),
Map(
FieldRef,
OffsetBufferBuilder<i32>,
@@ -270,7 +270,7 @@ enum Decoder {
#[cfg(feature = "avro_custom_types")]
RunEndEncoded(u8, usize, Box<Decoder>),
Union(UnionDecoder),
- Nullable(Nullability, NullBufferBuilder, Box<Decoder>, NullablePlan),
+ Nullable(NullablePlan, NullBufferBuilder, Box<Decoder>),
}
impl Decoder {
@@ -279,7 +279,7 @@ impl Decoder {
if info.writer_is_union && !info.reader_is_union {
let mut clone = data_type.clone();
clone.resolution = None; // Build target base decoder without
Union resolution
- let target = Box::new(Self::try_new_internal(&clone)?);
+ let target = Self::try_new_internal(&clone)?;
let decoder = Self::Union(
UnionDecoderBuilder::new()
.with_resolved_union(info.clone())
@@ -295,7 +295,7 @@ impl Decoder {
fn try_new_internal(data_type: &AvroDataType) -> Result<Self, AvroError> {
// Extract just the Promotion (if any) to simplify pattern matching
let promotion = match data_type.resolution.as_ref() {
- Some(ResolutionInfo::Promotion(p)) => Some(p),
+ Some(ResolutionInfo::Promotion(p)) => Some(*p),
_ => None,
};
let decoder = match (data_type.codec(), promotion) {
@@ -466,10 +466,9 @@ impl Decoder {
}
(Codec::Enum(symbols), _) => {
let res = match data_type.resolution.as_ref() {
- Some(ResolutionInfo::EnumMapping(mapping)) =>
Some(EnumResolution {
- mapping: mapping.mapping.clone(),
- default_index: mapping.default_index,
- }),
+ Some(ResolutionInfo::EnumMapping(mapping)) => {
+ Some(EnumResolution::new(mapping))
+ }
_ => None,
};
Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY),
symbols.clone(), res)
@@ -477,18 +476,27 @@ impl Decoder {
(Codec::Struct(fields), _) => {
let mut arrow_fields = Vec::with_capacity(fields.len());
let mut encodings = Vec::with_capacity(fields.len());
+ let mut field_defaults = Vec::with_capacity(fields.len());
for avro_field in fields.iter() {
let encoding = Self::try_new(avro_field.data_type())?;
arrow_fields.push(avro_field.field());
encodings.push(encoding);
+
+ if let Some(ResolutionInfo::DefaultValue(lit)) =
+ avro_field.data_type().resolution.as_ref()
+ {
+ field_defaults.push(Some(lit.clone()));
+ } else {
+ field_defaults.push(None);
+ }
}
let projector =
if let Some(ResolutionInfo::Record(rec)) =
data_type.resolution.as_ref() {
- Some(ProjectorBuilder::try_new(rec, fields).build()?)
+ Some(ProjectorBuilder::try_new(rec,
&field_defaults).build()?)
} else {
None
};
- Self::Record(arrow_fields.into(), encodings, projector)
+ Self::Record(arrow_fields.into(), encodings, field_defaults,
projector)
}
(Codec::Map(child), _) => {
let val_field = child.field_with_name("value");
@@ -568,20 +576,49 @@ impl Decoder {
};
Ok(match data_type.nullability() {
Some(nullability) => {
- // Default to reading a union branch tag unless the resolution
proves otherwise.
- let mut plan = NullablePlan::ReadTag;
- if let Some(ResolutionInfo::Union(info)) =
data_type.resolution.as_ref() {
- if !info.writer_is_union && info.reader_is_union {
- if let Some(Some((_reader_idx, promo))) =
info.writer_to_reader.first() {
- plan = NullablePlan::FromSingle { promotion:
*promo };
+ // Default to reading a union branch tag unless the resolution
directs otherwise.
+ let plan = match &data_type.resolution {
+ None => NullablePlan::ReadTag {
+ nullability,
+ resolution:
ResolutionPlan::Promotion(Promotion::Direct),
+ },
+ Some(ResolutionInfo::Promotion(_)) => {
+ // Promotions should have been incorporated
+ // into the inner decoder.
+ NullablePlan::FromSingle {
+ resolution:
ResolutionPlan::Promotion(Promotion::Direct),
}
}
- }
+ Some(ResolutionInfo::Union(info)) if !info.writer_is_union
=> {
+ let Some(Some((_, resolution))) =
info.writer_to_reader.first() else {
+ return Err(AvroError::SchemaError(
+ "unexpected union resolution info for
non-union writer and union reader type".into(),
+ ));
+ };
+ let resolution = ResolutionPlan::try_new(&decoder,
resolution)?;
+ NullablePlan::FromSingle { resolution }
+ }
+ Some(ResolutionInfo::Union(info)) => {
+ let Some((_, resolution)) =
+
info.writer_to_reader[nullability.non_null_index()].as_ref()
+ else {
+ return Err(AvroError::SchemaError(
+ "unexpected union resolution info for nullable
writer type".into(),
+ ));
+ };
+ NullablePlan::ReadTag {
+ nullability,
+ resolution: ResolutionPlan::try_new(&decoder,
resolution)?,
+ }
+ }
+ Some(resolution) => NullablePlan::FromSingle {
+ resolution: ResolutionPlan::try_new(&decoder,
resolution)?,
+ },
+ };
Self::Nullable(
- nullability,
+ plan,
NullBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(decoder),
- plan,
)
}
None => decoder,
@@ -645,7 +682,7 @@ impl Decoder {
Self::Array(_, offsets, _) => {
offsets.push_length(0);
}
- Self::Record(_, e, _) => {
+ Self::Record(_, e, _, _) => {
for encoding in e.iter_mut() {
encoding.append_null()?;
}
@@ -670,7 +707,7 @@ impl Decoder {
inner.append_null()?;
}
Self::Union(u) => u.append_null()?,
- Self::Nullable(_, null_buffer, inner, _) => {
+ Self::Nullable(_, null_buffer, inner) => {
null_buffer.append(false);
inner.append_null()?;
}
@@ -681,7 +718,7 @@ impl Decoder {
/// Append a single default literal into the decoder's buffers
fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), AvroError> {
match self {
- Self::Nullable(_, nb, inner, _) => {
+ Self::Nullable(_, nb, inner) => {
if matches!(lit, AvroLiteral::Null) {
nb.append(false);
inner.append_null()
@@ -1087,14 +1124,14 @@ impl Decoder {
inner.append_default(lit)
}
Self::Union(u) => u.append_default(lit),
- Self::Record(field_meta, decoders, projector) => match lit {
+ Self::Record(field_meta, decoders, field_defaults, _) => match lit
{
AvroLiteral::Map(entries) => {
for (i, dec) in decoders.iter_mut().enumerate() {
let name = field_meta[i].name();
if let Some(sub) = entries.get(name) {
dec.append_default(sub)?;
- } else if let Some(proj) = projector.as_ref() {
- proj.project_default(dec, i)?;
+ } else if let Some(default_literal) =
field_defaults[i].as_ref() {
+ dec.append_default(default_literal)?;
} else {
dec.append_null()?;
}
@@ -1103,8 +1140,8 @@ impl Decoder {
}
AvroLiteral::Null => {
for (i, dec) in decoders.iter_mut().enumerate() {
- if let Some(proj) = projector.as_ref() {
- proj.project_default(dec, i)?;
+ if let Some(default_literal) =
field_defaults[i].as_ref() {
+ dec.append_default(default_literal)?;
} else {
dec.append_null()?;
}
@@ -1246,12 +1283,12 @@ impl Decoder {
let total_items = read_blocks(buf, |cursor|
encoding.decode(cursor))?;
off.push_length(total_items);
}
- Self::Record(_, encodings, None) => {
+ Self::Record(_, encodings, _, None) => {
for encoding in encodings {
encoding.decode(buf)?;
}
}
- Self::Record(_, encodings, Some(proj)) => {
+ Self::Record(_, encodings, _, Some(proj)) => {
proj.project_record(buf, encodings)?;
}
Self::Map(_, koff, moff, kdata, valdec) => {
@@ -1286,18 +1323,8 @@ impl Decoder {
}
Self::Enum(indices, _, Some(res)) => {
let raw = buf.get_int()?;
- let resolved = usize::try_from(raw)
- .ok()
- .and_then(|idx| res.mapping.get(idx).copied())
- .filter(|&idx| idx >= 0)
- .unwrap_or(res.default_index);
- if resolved >= 0 {
- indices.push(resolved);
- } else {
- return Err(AvroError::ParseError(format!(
- "Enum symbol index {raw} not resolvable and no default
provided",
- )));
- }
+ let resolved = res.resolve(raw)?;
+ indices.push(resolved);
}
Self::Duration(builder) => {
let b = buf.get_fixed(12)?;
@@ -1313,26 +1340,31 @@ impl Decoder {
inner.decode(buf)?;
}
Self::Union(u) => u.decode(buf)?,
- Self::Nullable(order, nb, encoding, plan) => match *plan {
- NullablePlan::FromSingle { promotion } => {
- encoding.decode_with_promotion(buf, promotion)?;
- nb.append(true);
- }
- NullablePlan::ReadTag => {
- let branch = buf.read_vlq()?;
- let is_not_null = match *order {
- Nullability::NullFirst => branch != 0,
- Nullability::NullSecond => branch == 0,
- };
- if is_not_null {
- // It is important to decode before appending to null
buffer in case of decode error
- encoding.decode(buf)?;
- } else {
- encoding.append_null()?;
+ Self::Nullable(plan, nb, encoding) => {
+ match plan {
+ NullablePlan::FromSingle { resolution } => {
+ encoding.decode_with_resolution(buf, resolution)?;
+ nb.append(true);
+ }
+ NullablePlan::ReadTag {
+ nullability,
+ resolution,
+ } => {
+ let branch = buf.read_vlq()?;
+ let is_not_null = match *nullability {
+ Nullability::NullFirst => branch != 0,
+ Nullability::NullSecond => branch == 0,
+ };
+ if is_not_null {
+ // It is important to decode before appending to
null buffer in case of decode error
+ encoding.decode_with_resolution(buf, resolution)?;
+ } else {
+ encoding.append_null()?;
+ }
+ nb.append(is_not_null);
}
- nb.append(is_not_null);
}
- },
+ }
}
Ok(())
}
@@ -1401,10 +1433,49 @@ impl Decoder {
}
}
+ fn decode_with_resolution<'d>(
+ &'d mut self,
+ buf: &mut AvroCursor<'_>,
+ resolution: &'d ResolutionPlan,
+ ) -> Result<(), AvroError> {
+ #[cfg(feature = "avro_custom_types")]
+ if let Self::RunEndEncoded(_, len, inner) = self {
+ *len += 1;
+ return inner.decode_with_resolution(buf, resolution);
+ }
+
+ match resolution {
+ ResolutionPlan::Promotion(promotion) => {
+ let promotion = *promotion;
+ self.decode_with_promotion(buf, promotion)
+ }
+ ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
+ ResolutionPlan::EnumMapping(res) => {
+ let Self::Enum(indices, _, _) = self else {
+ return Err(AvroError::SchemaError(
+ "enum mapping resolution provided for non-enum
decoder".into(),
+ ));
+ };
+ let raw = buf.get_int()?;
+ let resolved = res.resolve(raw)?;
+ indices.push(resolved);
+ Ok(())
+ }
+ ResolutionPlan::Record(proj) => {
+ let Self::Record(_, encodings, _, _) = self else {
+ return Err(AvroError::SchemaError(
+ "record projection provided for non-record
decoder".into(),
+ ));
+ };
+ proj.project_record(buf, encodings)
+ }
+ }
+ }
+
/// Flush decoded records to an [`ArrayRef`]
fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef,
AvroError> {
Ok(match self {
- Self::Nullable(_, n, e, _) => e.flush(n.finish())?,
+ Self::Nullable(_, n, e) => e.flush(n.finish())?,
Self::Null(size) =>
Arc::new(NullArray::new(std::mem::replace(size, 0))),
Self::Boolean(b) => Arc::new(BooleanArray::new(b.finish(), nulls)),
Self::Int32(values) =>
Arc::new(flush_primitive::<Int32Type>(values, nulls)),
@@ -1533,7 +1604,7 @@ impl Decoder {
let offsets = flush_offsets(offsets);
Arc::new(ListArray::try_new(field.clone(), offsets, values,
nulls)?)
}
- Self::Record(fields, encodings, _) => {
+ Self::Record(fields, encodings, _, _) => {
let arrays = encodings
.iter_mut()
.map(|x| x.flush(None))
@@ -1678,6 +1749,83 @@ impl Decoder {
}
}
+/// Runtime plan for decoding reader-side `["null", T]` types.
+#[derive(Debug)]
+enum NullablePlan {
+ /// Writer actually wrote a union (branch tag present).
+ ReadTag {
+ nullability: Nullability,
+ resolution: ResolutionPlan,
+ },
+ /// Writer wrote a single (non-union) value resolved to the non-null branch
+ /// of the reader union; do NOT read a branch tag, but apply any
resolution.
+ FromSingle { resolution: ResolutionPlan },
+}
+
+/// Runtime plan for resolving writer-reader type differences.
+#[derive(Debug)]
+enum ResolutionPlan {
+ /// Indicates that the writer's type should be promoted to the reader's
type.
+ Promotion(Promotion),
+ /// Provides a default value for the field missing in the writer type.
+ DefaultValue(AvroLiteral),
+ /// Provides mapping information for resolving enums.
+ EnumMapping(EnumResolution),
+ /// Provides projection information for record fields.
+ Record(Projector),
+}
+
+impl ResolutionPlan {
+ fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self,
AvroError> {
+ match (decoder, resolution) {
+ (_, ResolutionInfo::Promotion(p)) =>
Ok(ResolutionPlan::Promotion(*p)),
+ (_, ResolutionInfo::DefaultValue(lit)) =>
Ok(ResolutionPlan::DefaultValue(lit.clone())),
+ (_, ResolutionInfo::EnumMapping(m)) => {
+ Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
+ }
+ (Decoder::Record(_, _, field_defaults, _),
ResolutionInfo::Record(r)) => Ok(
+ ResolutionPlan::Record(ProjectorBuilder::try_new(r,
field_defaults).build()?),
+ ),
+ (_, ResolutionInfo::Record(_)) => Err(AvroError::SchemaError(
+ "record resolution on non-record decoder".into(),
+ )),
+ (_, ResolutionInfo::Union(_)) => Err(AvroError::SchemaError(
+ "union variant cannot be resolved to a union type".into(),
+ )),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct EnumResolution {
+ mapping: Arc<[i32]>,
+ default_index: i32,
+}
+
+impl EnumResolution {
+ fn new(mapping: &EnumMapping) -> Self {
+ EnumResolution {
+ mapping: mapping.mapping.clone(),
+ default_index: mapping.default_index,
+ }
+ }
+
+ fn resolve(&self, index: i32) -> Result<i32, AvroError> {
+ let resolved = usize::try_from(index)
+ .ok()
+ .and_then(|idx| self.mapping.get(idx).copied())
+ .filter(|&idx| idx >= 0)
+ .unwrap_or(self.default_index);
+ if resolved >= 0 {
+ Ok(resolved)
+ } else {
+ Err(AvroError::ParseError(format!(
+ "Enum symbol index {index} not resolvable and no default
provided",
+ )))
+ }
+ }
+}
+
// A lookup table for resolving fields between writer and reader schemas
during record projection.
#[derive(Debug)]
struct DispatchLookupTable {
@@ -1697,11 +1845,11 @@ struct DispatchLookupTable {
// - `to_reader.len() == promotion.len()` and matches the reader field
count.
// - If `to_reader[r] == NO_SOURCE`, `promotion[r]` is ignored.
to_reader: Box<[i8]>,
- // For each reader field `r`, specifies the `Promotion` to apply to the
writer's value.
+ // For each reader field `r`, specifies the resolution to apply to the
writer's value.
//
// This is used when a writer field's type can be promoted to a reader
field's type
// (e.g., `Int` to `Long`). It is ignored if `to_reader[r] == NO_SOURCE`.
- promotion: Box<[Promotion]>,
+ resolution: Box<[ResolutionPlan]>,
}
// Sentinel used in `DispatchLookupTable::to_reader` to mark
@@ -1710,64 +1858,94 @@ const NO_SOURCE: i8 = -1;
impl DispatchLookupTable {
fn from_writer_to_reader(
- promotion_map: &[Option<(usize, Promotion)>],
+ reader_branches: &[Decoder],
+ resolution_map: &[Option<(usize, ResolutionInfo)>],
) -> Result<Self, AvroError> {
- let mut to_reader = Vec::with_capacity(promotion_map.len());
- let mut promotion = Vec::with_capacity(promotion_map.len());
- for map in promotion_map {
- match *map {
- Some((idx, promo)) => {
+ let mut to_reader = Vec::with_capacity(resolution_map.len());
+ let mut resolution = Vec::with_capacity(resolution_map.len());
+ for map in resolution_map {
+ match map {
+ Some((idx, res)) => {
+ let idx = *idx;
let idx_i8 = i8::try_from(idx).map_err(|_| {
AvroError::SchemaError(format!(
"Reader branch index {idx} exceeds i8 range (max
{})",
i8::MAX
))
})?;
+ let plan = ResolutionPlan::try_new(&reader_branches[idx],
res)?;
to_reader.push(idx_i8);
- promotion.push(promo);
+ resolution.push(plan);
}
None => {
to_reader.push(NO_SOURCE);
- promotion.push(Promotion::Direct);
+
resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
}
}
}
Ok(Self {
to_reader: to_reader.into_boxed_slice(),
- promotion: promotion.into_boxed_slice(),
+ resolution: resolution.into_boxed_slice(),
})
}
- // Resolve a writer branch index to (reader_idx, promotion)
+ // Resolve a writer branch index to (reader_idx, resolution)
#[inline]
- fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+ fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)>
{
let reader_index = *self.to_reader.get(writer_index)?;
- (reader_index >= 0).then(|| (reader_index as usize,
self.promotion[writer_index]))
+ (reader_index >= 0).then(|| (reader_index as usize,
&self.resolution[writer_index]))
}
}
#[derive(Debug)]
struct UnionDecoder {
fields: UnionFields,
- type_ids: Vec<i8>,
- offsets: Vec<i32>,
- branches: Vec<Decoder>,
- counts: Vec<i32>,
- reader_type_codes: Vec<i8>,
+ branches: UnionDecoderBranches,
default_emit_idx: usize,
null_emit_idx: usize,
plan: UnionReadPlan,
}
+#[derive(Debug, Default)]
+struct UnionDecoderBranches {
+ decoders: Vec<Decoder>,
+ reader_type_codes: Vec<i8>,
+ type_ids: Vec<i8>,
+ offsets: Vec<i32>,
+ counts: Vec<i32>,
+}
+
+impl UnionDecoderBranches {
+ fn new(decoders: Vec<Decoder>, reader_type_codes: Vec<i8>) -> Self {
+ let branch_len = decoders.len().max(reader_type_codes.len());
+ Self {
+ decoders,
+ reader_type_codes,
+ type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+ offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+ counts: vec![0; branch_len],
+ }
+ }
+
+ fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder,
AvroError> {
+ let branches_len = self.decoders.len();
+ let Some(reader_branch) = self.decoders.get_mut(reader_idx) else {
+ return Err(AvroError::ParseError(format!(
+ "Union branch index {reader_idx} out of range ({branches_len}
branches)"
+ )));
+ };
+ self.type_ids.push(self.reader_type_codes[reader_idx]);
+ self.offsets.push(self.counts[reader_idx]);
+ self.counts[reader_idx] += 1;
+ Ok(reader_branch)
+ }
+}
+
impl Default for UnionDecoder {
fn default() -> Self {
Self {
fields: UnionFields::empty(),
- type_ids: Vec::new(),
- offsets: Vec::new(),
- branches: Vec::new(),
- counts: Vec::new(),
- reader_type_codes: Vec::new(),
+ branches: Default::default(),
default_emit_idx: 0,
null_emit_idx: 0,
plan: UnionReadPlan::Passthrough,
@@ -1782,7 +1960,7 @@ enum UnionReadPlan {
},
FromSingle {
reader_idx: usize,
- promotion: Promotion,
+ resolution: ResolutionPlan,
},
ToSingle {
target: Box<Decoder>,
@@ -1791,6 +1969,47 @@ enum UnionReadPlan {
Passthrough,
}
+impl UnionReadPlan {
+ fn from_resolved(
+ reader_branches: &[Decoder],
+ resolved: Option<ResolvedUnion>,
+ ) -> Result<Self, AvroError> {
+ let Some(info) = resolved else {
+ return Ok(Self::Passthrough);
+ };
+ match (info.writer_is_union, info.reader_is_union) {
+ (true, true) => {
+ let lookup_table =
+
DispatchLookupTable::from_writer_to_reader(reader_branches,
&info.writer_to_reader)?;
+ Ok(Self::ReaderUnion { lookup_table })
+ }
+ (false, true) => {
+ let Some((idx, resolution)) =
+ info.writer_to_reader.first().and_then(Option::as_ref)
+ else {
+ return Err(AvroError::SchemaError(
+ "Writer type does not match any reader union
branch".to_string(),
+ ));
+ };
+ let reader_idx = *idx;
+ Ok(Self::FromSingle {
+ reader_idx,
+ resolution:
ResolutionPlan::try_new(&reader_branches[reader_idx], resolution)?,
+ })
+ }
+ (true, false) => Err(AvroError::InvalidArgument(
+ "UnionDecoder::try_new cannot build writer-union to single;
use UnionDecoderBuilder with a target"
+ .to_string(),
+ )),
+ // (false, false) is invalid and should never be constructed by
the resolver.
+ _ => Err(AvroError::SchemaError(
+ "ResolvedUnion constructed for non-union sides; resolver
should return None"
+ .to_string(),
+ )),
+ }
+ }
+}
+
impl UnionDecoder {
fn try_new(
fields: UnionFields,
@@ -1801,7 +2020,6 @@ impl UnionDecoder {
let null_branch = branches.iter().position(|b| matches!(b,
Decoder::Null(_)));
let default_emit_idx = 0;
let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
- let branch_len = branches.len().max(reader_type_codes.len());
// Guard against impractically large unions that cannot be indexed by
an Avro int
let max_addr = (i32::MAX as usize) + 1;
if branches.len() > max_addr {
@@ -1812,26 +2030,23 @@ impl UnionDecoder {
i32::MAX
)));
}
+ let plan = UnionReadPlan::from_resolved(&branches, resolved)?;
Ok(Self {
fields,
- type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
- offsets: Vec::with_capacity(DEFAULT_CAPACITY),
- branches,
- counts: vec![0; branch_len],
- reader_type_codes,
+ branches: UnionDecoderBranches::new(branches, reader_type_codes),
default_emit_idx,
null_emit_idx,
- plan: Self::plan_from_resolved(resolved)?,
+ plan,
})
}
- fn try_new_from_writer_union(
- info: ResolvedUnion,
- target: Box<Decoder>,
- ) -> Result<Self, AvroError> {
+ fn with_single_target(target: Decoder, info: ResolvedUnion) ->
Result<Self, AvroError> {
// This constructor is only for writer-union to single-type resolution
debug_assert!(info.writer_is_union && !info.reader_is_union);
- let lookup_table =
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
+ let mut reader_branches = [target];
+ let lookup_table =
+ DispatchLookupTable::from_writer_to_reader(&reader_branches,
&info.writer_to_reader)?;
+ let target = Box::new(mem::replace(&mut reader_branches[0],
Decoder::Null(0)));
Ok(Self {
plan: UnionReadPlan::ToSingle {
target,
@@ -1841,41 +2056,6 @@ impl UnionDecoder {
})
}
- fn plan_from_resolved(resolved: Option<ResolvedUnion>) ->
Result<UnionReadPlan, AvroError> {
- let Some(info) = resolved else {
- return Ok(UnionReadPlan::Passthrough);
- };
- match (info.writer_is_union, info.reader_is_union) {
- (true, true) => {
- let lookup_table =
-
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader)?;
- Ok(UnionReadPlan::ReaderUnion { lookup_table })
- }
- (false, true) => {
- let Some(&(reader_idx, promotion)) =
- info.writer_to_reader.first().and_then(Option::as_ref)
- else {
- return Err(AvroError::SchemaError(
- "Writer type does not match any reader union
branch".to_string(),
- ));
- };
- Ok(UnionReadPlan::FromSingle {
- reader_idx,
- promotion,
- })
- }
- (true, false) => Err(AvroError::InvalidArgument(
- "UnionDecoder::try_new cannot build writer-union to single;
use UnionDecoderBuilder with a target"
- .to_string(),
- )),
- // (false, false) is invalid and should never be constructed by
the resolver.
- _ => Err(AvroError::SchemaError(
- "ResolvedUnion constructed for non-union sides; resolver
should return None"
- .to_string(),
- )),
- }
- }
-
#[inline]
fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, AvroError> {
// Avro unions are encoded by first writing the zero-based branch
index.
@@ -1896,20 +2076,6 @@ impl UnionDecoder {
})
}
- #[inline]
- fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder,
AvroError> {
- let branches_len = self.branches.len();
- let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
- return Err(AvroError::ParseError(format!(
- "Union branch index {reader_idx} out of range ({branches_len}
branches)"
- )));
- };
- self.type_ids.push(self.reader_type_codes[reader_idx]);
- self.offsets.push(self.counts[reader_idx]);
- self.counts[reader_idx] += 1;
- Ok(reader_branch)
- }
-
#[inline]
fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(),
AvroError>
where
@@ -1922,7 +2088,7 @@ impl UnionDecoder {
UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
_ => fallback_idx,
};
- self.emit_to(reader_idx).and_then(action)
+ self.branches.emit_to(reader_idx).and_then(action)
}
fn append_null(&mut self) -> Result<(), AvroError> {
@@ -1934,35 +2100,42 @@ impl UnionDecoder {
}
fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
- let (reader_idx, promotion) = match &mut self.plan {
- UnionReadPlan::Passthrough => (Self::read_tag(buf)?,
Promotion::Direct),
+ match &mut self.plan {
+ UnionReadPlan::Passthrough => {
+ let reader_idx = Self::read_tag(buf)?;
+ let decoder = self.branches.emit_to(reader_idx)?;
+ decoder.decode(buf)
+ }
UnionReadPlan::ReaderUnion { lookup_table } => {
let idx = Self::read_tag(buf)?;
- lookup_table.resolve(idx).ok_or_else(|| {
- AvroError::ParseError(format!(
+ let Some((reader_idx, resolution)) = lookup_table.resolve(idx)
else {
+ return Err(AvroError::ParseError(format!(
"Union branch index {idx} not resolvable by reader
schema"
- ))
- })?
+ )));
+ };
+ let decoder = self.branches.emit_to(reader_idx)?;
+ decoder.decode_with_resolution(buf, resolution)
}
UnionReadPlan::FromSingle {
reader_idx,
- promotion,
- } => (*reader_idx, *promotion),
+ resolution,
+ } => {
+ let decoder = self.branches.emit_to(*reader_idx)?;
+ decoder.decode_with_resolution(buf, resolution)
+ }
UnionReadPlan::ToSingle {
target,
lookup_table,
} => {
let idx = Self::read_tag(buf)?;
- return match lookup_table.resolve(idx) {
- Some((_, promotion)) => target.decode_with_promotion(buf,
promotion),
- None => Err(AvroError::ParseError(format!(
- "Writer union branch {idx} does not resolve to reader
type"
- ))),
+ let Some((_, resolution)) = lookup_table.resolve(idx) else {
+ return Err(AvroError::ParseError(format!(
+ "Writer union branch index {idx} not resolvable by
reader schema"
+ )));
};
+ target.decode_with_resolution(buf, resolution)
}
- };
- let decoder = self.emit_to(reader_idx)?;
- decoder.decode_with_promotion(buf, promotion)
+ }
}
fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef,
AvroError> {
@@ -1976,13 +2149,20 @@ impl UnionDecoder {
);
let children = self
.branches
+ .decoders
.iter_mut()
.map(|d| d.flush(None))
.collect::<Result<Vec<_>, _>>()?;
let arr = UnionArray::try_new(
self.fields.clone(),
- flush_values(&mut self.type_ids).into_iter().collect(),
- Some(flush_values(&mut self.offsets).into_iter().collect()),
+ flush_values(&mut self.branches.type_ids)
+ .into_iter()
+ .collect(),
+ Some(
+ flush_values(&mut self.branches.offsets)
+ .into_iter()
+ .collect(),
+ ),
children,
)
.map_err(|e| AvroError::ParseError(e.to_string()))?;
@@ -1995,7 +2175,7 @@ struct UnionDecoderBuilder {
fields: Option<UnionFields>,
branches: Option<Vec<Decoder>>,
resolved: Option<ResolvedUnion>,
- target: Option<Box<Decoder>>,
+ target: Option<Decoder>,
}
impl UnionDecoderBuilder {
@@ -2018,7 +2198,7 @@ impl UnionDecoderBuilder {
self
}
- fn with_target(mut self, target: Box<Decoder>) -> Self {
+ fn with_target(mut self, target: Decoder) -> Self {
self.target = Some(target);
self
}
@@ -2031,7 +2211,7 @@ impl UnionDecoderBuilder {
(Some(info), None, None, Some(target))
if info.writer_is_union && !info.reader_is_union =>
{
- UnionDecoder::try_new_from_writer_union(info, target)
+ UnionDecoder::with_single_target(target, info)
}
_ => Err(AvroError::InvalidArgument(
"Invalid UnionDecoderBuilder configuration: expected either \
@@ -2238,42 +2418,31 @@ fn values_equal_at(arr: &dyn Array, i: usize, j: usize)
-> bool {
struct Projector {
writer_to_reader: Arc<[Option<usize>]>,
skip_decoders: Vec<Option<Skipper>>,
- field_defaults: Vec<Option<AvroLiteral>>,
default_injections: Arc<[(usize, AvroLiteral)]>,
}
#[derive(Debug)]
struct ProjectorBuilder<'a> {
rec: &'a ResolvedRecord,
- reader_fields: Arc<[AvroField]>,
+ field_defaults: &'a [Option<AvroLiteral>],
}
impl<'a> ProjectorBuilder<'a> {
#[inline]
- fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) ->
Self {
+ fn try_new(rec: &'a ResolvedRecord, field_defaults: &'a
[Option<AvroLiteral>]) -> Self {
Self {
rec,
- reader_fields: reader_fields.clone(),
+ field_defaults,
}
}
#[inline]
fn build(self) -> Result<Projector, AvroError> {
- let reader_fields = self.reader_fields;
- let mut field_defaults: Vec<Option<AvroLiteral>> =
Vec::with_capacity(reader_fields.len());
- for avro_field in reader_fields.as_ref() {
- if let Some(ResolutionInfo::DefaultValue(lit)) =
- avro_field.data_type().resolution.as_ref()
- {
- field_defaults.push(Some(lit.clone()));
- } else {
- field_defaults.push(None);
- }
- }
let mut default_injections: Vec<(usize, AvroLiteral)> =
Vec::with_capacity(self.rec.default_fields.len());
for &idx in self.rec.default_fields.as_ref() {
- let lit = field_defaults
+ let lit = self
+ .field_defaults
.get(idx)
.and_then(|lit| lit.clone())
.unwrap_or(AvroLiteral::Null);
@@ -2291,31 +2460,15 @@ impl<'a> ProjectorBuilder<'a> {
Ok(Projector {
writer_to_reader: self.rec.writer_to_reader.clone(),
skip_decoders,
- field_defaults,
default_injections: default_injections.into(),
})
}
}
impl Projector {
- #[inline]
- fn project_default(&self, decoder: &mut Decoder, index: usize) ->
Result<(), AvroError> {
- // SAFETY: `index` is obtained by listing the reader's record fields
(i.e., from
- // `decoders.iter_mut().enumerate()`), and `field_defaults` was built
in
- // `ProjectorBuilder::build` to have exactly one element per reader
field.
- // Therefore, `index < self.field_defaults.len()` always holds here, so
- // `self.field_defaults[index]` cannot panic. We only take an
immutable reference
- // via `.as_ref()`, and `self` is borrowed immutably.
- if let Some(default_literal) = self.field_defaults[index].as_ref() {
- decoder.append_default(default_literal)
- } else {
- decoder.append_null()
- }
- }
-
#[inline]
fn project_record(
- &mut self,
+ &self,
buf: &mut AvroCursor<'_>,
encodings: &mut [Decoder],
) -> Result<(), AvroError> {
@@ -2327,10 +2480,10 @@ impl Projector {
for (i, (mapping, skipper_opt)) in self
.writer_to_reader
.iter()
- .zip(self.skip_decoders.iter_mut())
+ .zip(self.skip_decoders.iter())
.enumerate()
{
- match (mapping, skipper_opt.as_mut()) {
+ match (mapping, skipper_opt.as_ref()) {
(Some(reader_index), _) =>
encodings[*reader_index].decode(buf)?,
(None, Some(skipper)) => skipper.skip(buf)?,
(None, None) => {
@@ -2459,7 +2612,7 @@ impl Skipper {
Ok(base)
}
- fn skip(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
+ fn skip(&self, buf: &mut AvroCursor<'_>) -> Result<(), AvroError> {
match self {
Self::Null => Ok(()),
Self::Boolean => {
@@ -2522,7 +2675,7 @@ impl Skipper {
Ok(())
}
Self::Struct(fields) => {
- for f in fields.iter_mut() {
+ for f in fields.iter() {
f.skip(buf)?
}
Ok(())
@@ -2541,7 +2694,7 @@ impl Skipper {
(usize::BITS as usize)
))
})?;
- let Some(encoding) = encodings.get_mut(idx) else {
+ let Some(encoding) = encodings.get(idx) else {
return Err(AvroError::ParseError(format!(
"Union branch index {idx} out of range for skipper ({}
branches)",
encodings.len()
@@ -3488,10 +3641,12 @@ mod tests {
let dt = avro_from_codec(Codec::Decimal(4, Some(1), None));
let inner = Decoder::try_new(&dt).unwrap();
let mut decoder = Decoder::Nullable(
- Nullability::NullSecond,
+ NullablePlan::ReadTag {
+ nullability: Nullability::NullSecond,
+ resolution: ResolutionPlan::Promotion(Promotion::Direct),
+ },
NullBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(inner),
- NullablePlan::ReadTag,
);
let mut data = Vec::new();
data.extend_from_slice(&encode_avro_int(0));
@@ -3531,10 +3686,12 @@ mod tests {
let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16)));
let inner = Decoder::try_new(&dt).unwrap();
let mut decoder = Decoder::Nullable(
- Nullability::NullSecond,
+ NullablePlan::ReadTag {
+ nullability: Nullability::NullSecond,
+ resolution: ResolutionPlan::Promotion(Promotion::Direct),
+ },
NullBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(inner),
- NullablePlan::ReadTag,
);
let row1 = [
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01,
@@ -3992,10 +4149,10 @@ mod tests {
Decoder::Record(
fields,
encodings,
+ vec![None; reader_fields.len()],
Some(Projector {
writer_to_reader: Arc::from(writer_to_reader),
skip_decoders,
- field_defaults: vec![None; reader_fields.len()],
default_injections: Arc::from(Vec::<(usize,
AvroLiteral)>::new()),
}),
)
@@ -4374,10 +4531,9 @@ mod tests {
let projector = Projector {
writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
skip_decoders,
- field_defaults,
default_injections: Arc::from(default_injections),
};
- Decoder::Record(fields, encodings, Some(projector))
+ Decoder::Record(fields, encodings, field_defaults, Some(projector))
}
#[cfg(feature = "avro_custom_types")]
@@ -4631,10 +4787,12 @@ mod tests {
fn test_default_append_nullable_int32_null_and_value() {
let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
let mut dec = Decoder::Nullable(
- Nullability::NullFirst,
+ NullablePlan::ReadTag {
+ nullability: Nullability::NullFirst,
+ resolution: ResolutionPlan::Promotion(Promotion::Direct),
+ },
NullBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(inner),
- NullablePlan::ReadTag,
);
dec.append_default(&AvroLiteral::Null).unwrap();
dec.append_default(&AvroLiteral::Int(11)).unwrap();
@@ -4885,29 +5043,33 @@ mod tests {
field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(),
*nullable)));
}
let enc_a = Decoder::Nullable(
- Nullability::NullSecond,
+ NullablePlan::ReadTag {
+ nullability: Nullability::NullSecond,
+ resolution: ResolutionPlan::Promotion(Promotion::Direct),
+ },
NullBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
- NullablePlan::ReadTag,
);
let enc_b = Decoder::Nullable(
- Nullability::NullSecond,
+ NullablePlan::ReadTag {
+ nullability: Nullability::NullSecond,
+ resolution: ResolutionPlan::Promotion(Promotion::Direct),
+ },
NullBufferBuilder::new(DEFAULT_CAPACITY),
Box::new(Decoder::String(
OffsetBufferBuilder::new(DEFAULT_CAPACITY),
Vec::with_capacity(DEFAULT_CAPACITY),
)),
- NullablePlan::ReadTag,
);
encoders.push(enc_a);
encoders.push(enc_b);
+ let field_defaults = vec![None, None]; // no defaults -> append_null
let projector = Projector {
writer_to_reader: Arc::from(vec![]),
skip_decoders: vec![],
- field_defaults: vec![None, None], // no defaults -> append_null
default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
};
- let mut rec = Decoder::Record(field_refs.into(), encoders,
Some(projector));
+ let mut rec = Decoder::Record(field_refs.into(), encoders,
field_defaults, Some(projector));
let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
map.insert("a".to_string(), AvroLiteral::Int(9));
rec.append_default(&AvroLiteral::Map(map)).unwrap();
@@ -5034,7 +5196,7 @@ mod tests {
Codec::DurationSeconds,
] {
let dt = make_avro_dt(codec.clone(), None);
- let mut s = Skipper::from_avro(&dt)?;
+ let s = Skipper::from_avro(&dt)?;
for &v in &values {
let bytes = encode_avro_long(v);
let mut cursor = AvroCursor::new(&bytes);
@@ -5055,7 +5217,7 @@ mod tests {
#[test]
fn skipper_nullable_custom_duration_respects_null_first() -> Result<(),
AvroError> {
let dt = make_avro_dt(Codec::DurationNanos,
Some(Nullability::NullFirst));
- let mut s = Skipper::from_avro(&dt)?;
+ let s = Skipper::from_avro(&dt)?;
match &s {
Skipper::Nullable(Nullability::NullFirst, inner) => match **inner {
Skipper::Int64 => {}
@@ -5084,7 +5246,7 @@ mod tests {
#[test]
fn skipper_nullable_custom_duration_respects_null_second() -> Result<(),
AvroError> {
let dt = make_avro_dt(Codec::DurationMicros,
Some(Nullability::NullSecond));
- let mut s = Skipper::from_avro(&dt)?;
+ let s = Skipper::from_avro(&dt)?;
match &s {
Skipper::Nullable(Nullability::NullSecond, inner) => match **inner
{
Skipper::Int64 => {}
@@ -5115,7 +5277,7 @@ mod tests {
#[test]
fn skipper_interval_is_fixed12_and_skips_12_bytes() -> Result<(),
AvroError> {
let dt = make_avro_dt(Codec::Interval, None);
- let mut s = Skipper::from_avro(&dt)?;
+ let s = Skipper::from_avro(&dt)?;
match s {
Skipper::DurationFixed12 => {}
other => panic!("expected DurationFixed12, got {:?}", other),
@@ -5227,12 +5389,11 @@ mod tests {
Box::new(inner_values),
);
let mut dec = Decoder::Nullable(
- Nullability::NullSecond,
- NullBufferBuilder::new(DEFAULT_CAPACITY),
- Box::new(ree),
NullablePlan::FromSingle {
- promotion: Promotion::IntToDouble,
+ resolution: ResolutionPlan::Promotion(Promotion::IntToDouble),
},
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(ree),
);
for v in [1, 1, 2, 2, 2] {
let bytes = encode_avro_int(v);
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 90c0d5a164..1b0c2e26f7 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -78,6 +78,16 @@ pub(crate) enum Nullability {
NullSecond,
}
+impl Nullability {
+ /// Returns the index of the non-null variant in the union.
+ pub(crate) fn non_null_index(&self) -> usize {
+ match self {
+ Nullability::NullFirst => 1,
+ Nullability::NullSecond => 0,
+ }
+ }
+}
+
/// Either a [`PrimitiveType`] or a reference to a previously defined named
type
///
/// <https://avro.apache.org/docs/1.11.1/specification/#names>
@@ -3331,7 +3341,11 @@ mod tests {
false,
)])),
false,
- );
+ )
+ .with_metadata(HashMap::from_iter([(
+ "avro.name".to_owned(),
+ "R".to_owned(),
+ )]));
assert_eq!(resolved.field(), expected);
}
@@ -3393,7 +3407,11 @@ mod tests {
false,
)])),
false,
- );
+ )
+ .with_metadata(HashMap::from_iter([(
+ "avro.name".to_owned(),
+ "R".to_owned(),
+ )]));
assert_eq!(resolved.field(), expected);
}
@@ -3430,7 +3448,11 @@ mod tests {
)])),
])),
false,
- );
+ )
+ .with_metadata(HashMap::from_iter([(
+ "avro.name".to_owned(),
+ "R".to_owned(),
+ )]));
assert_eq!(resolved.field(), expected);
}