This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 17bd39c Added Schema::independent_canonical_form (#66)
17bd39c is described below
commit 17bd39c4e59e11161d84545a0c78e776763f315e
Author: chupaty <[email protected]>
AuthorDate: Mon Jan 20 18:37:42 2025 +1000
Added Schema::independent_canonical_form (#66)
* Added Schema::independent_canonical_form
Added independent_canonical_from, which populates names available in the
schemata, so that the given schema can be used without the schemata
* rust fmt
* clippy fix
* unused imports
* Fix for nested record usage
* cargo fmt
* Allow independent_canonical_form() to fail if ref is not found
* cargo fmt
* clippy cleanup
* Minor cleanup and better naming
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---------
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
---
avro/src/schema.rs | 109 +++++++++++++----
avro/tests/schema.rs | 336 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 423 insertions(+), 22 deletions(-)
diff --git a/avro/src/schema.rs b/avro/src/schema.rs
index 93dbe18..1737665 100644
--- a/avro/src/schema.rs
+++ b/avro/src/schema.rs
@@ -34,7 +34,7 @@ use serde::{
};
use serde_json::{Map, Value};
use std::{
- borrow::{Borrow, Cow},
+ borrow::Borrow,
collections::{BTreeMap, HashMap, HashSet},
fmt,
fmt::Debug,
@@ -1041,7 +1041,19 @@ impl Schema {
pub fn canonical_form(&self) -> String {
let json = serde_json::to_value(self)
.unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
- parsing_canonical_form(&json)
+ let mut defined_names = HashSet::new();
+ parsing_canonical_form(&json, &mut defined_names)
+ }
+
+ /// Returns the [Parsing Canonical Form] of `self` that is self contained
(not dependent on
+ /// any definitions in `schemata`)
+ ///
+ /// [Parsing Canonical Form]:
+ ///
https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas
+ pub fn independent_canonical_form(&self, schemata: &Vec<Schema>) ->
Result<String, Error> {
+ let mut this = self.clone();
+ this.denormalize(schemata)?;
+ Ok(this.canonical_form())
}
/// Generate [fingerprint] of Schema's [Parsing Canonical Form].
@@ -1246,6 +1258,41 @@ impl Schema {
attributes,
})
}
+
+ fn denormalize(&mut self, schemata: &Vec<Schema>) -> AvroResult<()> {
+ match self {
+ Schema::Ref { name } => {
+ let replacement_schema = schemata
+ .iter()
+ .find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
+ if let Some(schema) = replacement_schema {
+ let mut denorm = schema.clone();
+ denorm.denormalize(schemata)?;
+ *self = denorm;
+ } else {
+ return Err(Error::SchemaResolutionError(name.clone()));
+ }
+ }
+ Schema::Record(record_schema) => {
+ for field in &mut record_schema.fields {
+ field.schema.denormalize(schemata)?;
+ }
+ }
+ Schema::Array(array_schema) => {
+ array_schema.items.denormalize(schemata)?;
+ }
+ Schema::Map(map_schema) => {
+ map_schema.types.denormalize(schemata)?;
+ }
+ Schema::Union(union_schema) => {
+ for schema in &mut union_schema.schemas {
+ schema.denormalize(schemata)?;
+ }
+ }
+ _ => (),
+ }
+ Ok(())
+ }
}
impl Parser {
@@ -2245,19 +2292,39 @@ impl Serialize for RecordField {
/// Parses a **valid** avro schema into the Parsing Canonical Form.
///
https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas
-fn parsing_canonical_form(schema: &Value) -> String {
+fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>)
-> String {
match schema {
- Value::Object(map) => pcf_map(map),
+ Value::Object(map) => pcf_map(map, defined_names),
Value::String(s) => pcf_string(s),
- Value::Array(v) => pcf_array(v),
+ Value::Array(v) => pcf_array(v, defined_names),
json => panic!("got invalid JSON value for canonical form of schema:
{json}"),
}
}
-fn pcf_map(schema: &Map<String, Value>) -> String {
+fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>)
-> String {
// Look for the namespace variant up front.
let ns = schema.get("namespace").and_then(|v| v.as_str());
let typ = schema.get("type").and_then(|v| v.as_str());
+ let raw_name = schema.get("name").and_then(|v| v.as_str());
+ let name = if is_named_type(typ) {
+ Some(format!(
+ "{}{}",
+ ns.map_or("".to_string(), |n| { format!("{n}.") }),
+ raw_name.unwrap_or_default()
+ ))
+ } else {
+ None
+ };
+
+ //if this is already a defined type, early return
+ if let Some(ref n) = name {
+ if defined_names.contains(n) {
+ return pcf_string(n);
+ } else {
+ defined_names.insert(n.clone());
+ }
+ }
+
let mut fields = Vec::new();
for (k, v) in schema {
// Reduce primitive types to their simple form. ([PRIMITIVE] rule)
@@ -2280,17 +2347,10 @@ fn pcf_map(schema: &Map<String, Value>) -> String {
// Fully qualify the name, if it isn't already ([FULLNAMES] rule).
if k == "name" {
- // Invariant: Only valid schemas. Must be a string.
- let name = v.as_str().unwrap();
- let n = match ns {
- Some(namespace) if is_named_type(typ) && !name.contains('.')
=> {
- Cow::Owned(format!("{namespace}.{name}"))
- }
- _ => Cow::Borrowed(name),
- };
-
- fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n))));
- continue;
+ if let Some(ref n) = name {
+ fields.push(("name", format!("{}:{}", pcf_string(k),
pcf_string(n))));
+ continue;
+ }
}
// Strip off quotes surrounding "size" type, if they exist ([INTEGERS]
rule).
@@ -2306,7 +2366,11 @@ fn pcf_map(schema: &Map<String, Value>) -> String {
// For anything else, recursively process the result.
fields.push((
k,
- format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
+ format!(
+ "{}:{}",
+ pcf_string(k),
+ parsing_canonical_form(v, defined_names)
+ ),
));
}
@@ -2327,10 +2391,10 @@ fn is_named_type(typ: Option<&str>) -> bool {
)
}
-fn pcf_array(arr: &[Value]) -> String {
+fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
let inter = arr
.iter()
- .map(parsing_canonical_form)
+ .map(|a| parsing_canonical_form(a, defined_names))
.collect::<Vec<String>>()
.join(",");
format!("[{inter}]")
@@ -2376,6 +2440,7 @@ pub trait AvroSchema {
#[cfg(feature = "derive")]
pub mod derive {
use super::*;
+ use std::borrow::Cow;
/// Trait for types that serve as fully defined components inside an Avro
data model. Derive
/// implementation available through `derive` feature. This is what is
implemented by
@@ -3424,7 +3489,7 @@ mod tests {
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
- let expected =
r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#;
+ let expected =
r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
@@ -3508,7 +3573,7 @@ mod tests {
assert_eq!(schema, expected);
let canonical_form = &schema.canonical_form();
- let expected =
r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#;
+ let expected =
r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
assert_eq!(canonical_form, &expected);
Ok(())
diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs
index 13cf6af..0c90615 100644
--- a/avro/tests/schema.rs
+++ b/avro/tests/schema.rs
@@ -2017,3 +2017,339 @@ fn test_avro_3851_read_default_value_for_enum() ->
TestResult {
Ok(())
}
+
+#[test]
+fn avro_rs_66_test_independent_canonical_form_primitives() -> TestResult {
+ init();
+ let record_primitive = r#"{
+ "name": "Rec",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {"name": "v", "type": "int"}
+ ]
+ }"#;
+
+ let enum_primitive = r#"{
+ "name": "En",
+ "type": "enum",
+ "symbols": [ "bar0", "bar1" ]
+ }"#;
+
+ let fixed_primitive = r#"{
+ "name": "Fix",
+ "type": "fixed",
+ "size": 4
+ }"#;
+
+ let record_with_dependencies = r#"{
+ "name": "RecWithDeps",
+ "type": "record",
+ "fields": [
+ {"name": "v1", "type": "ns.Rec"},
+ {"name": "v2", "type": "En"},
+ {"name": "v3", "type": "Fix"},
+ {"name": "v4", "type": "ns.Rec"},
+ {"name": "v5", "type": "En"},
+ {"name": "v6", "type": "Fix"}
+ ]
+ }"#;
+
+ let record_with_no_dependencies = r#"{
+ "name": "RecWithDeps",
+ "type": "record",
+ "fields": [
+ {
+ "name": "v1", "type": {
+ "name": "Rec",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {"name": "v", "type": "int"}
+ ]
+ }
+ },
+ {
+ "name": "v2", "type": {
+ "name": "En",
+ "type": "enum",
+ "symbols": [ "bar0", "bar1" ]
+ }
+ },
+ {"name": "v3", "type":
+ {
+ "name": "Fix",
+ "type": "fixed",
+ "size": 4
+ }
+ },
+ {"name": "v4", "type": "ns.Rec"},
+ {"name": "v5", "type": "En"},
+ {"name": "v6", "type": "Fix"}
+ ]
+ }"#;
+
+ let independent_schema = Schema::parse_str(record_with_no_dependencies)?;
+ let schema_strs = [
+ fixed_primitive,
+ enum_primitive,
+ record_primitive,
+ record_with_dependencies,
+ ];
+
+ for schema_str_perm in permutations(&schema_strs) {
+ let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s|
**s).collect();
+ let schemata = Schema::parse_list(&schema_str_perm)?;
+ assert_eq!(schemata.len(), schema_strs.len());
+ let test_schema = schemata
+ .iter()
+ .find(|a| a.name().unwrap().to_string() == *"RecWithDeps")
+ .unwrap();
+
+ assert_eq!(
+ independent_schema.independent_canonical_form(&schemata)?,
+ independent_schema.canonical_form()
+ );
+
+ assert_eq!(
+ independent_schema.canonical_form(),
+ test_schema.independent_canonical_form(&schemata)?
+ );
+ }
+ Ok(())
+}
+
+#[test]
+fn avro_rs_66_test_independent_canonical_form_usages() -> TestResult {
+ init();
+ let record_primitive = r#"{
+ "name": "Rec",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {"name": "v", "type": "int"}
+ ]
+ }"#;
+
+ let record_usage = r#"{
+ "name": "RecUsage",
+ "type": "record",
+ "fields": [
+ {"name": "v1", "type": "ns.Rec"},
+ {"name": "v2", "type": "ns.Rec"}
+ ]
+ }"#;
+ let record_usage_independent = r#"{
+ "name": "RecUsage",
+ "type": "record",
+ "fields": [
+ {"name": "v1", "type": {
+ "name": "ns.Rec", "type": "record","fields": [{"name": "v",
"type": "int"}]}
+ },
+ {"name": "v2", "type": "ns.Rec"}
+ ]
+ }"#;
+
+ let array_usage = r#"{
+ "name": "ArrayUsage",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": {"type": "array", "items":
"ns.Rec"}},
+ {"name": "field_two", "type": {"type": "array", "items": "ns.Rec"}}
+ ]
+ }"#;
+ let array_usage_independent = r#"{
+ "name": "ArrayUsage",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": {"type": "array", "items": {
+ "name": "ns.Rec", "type": "record","fields": [{"name": "v",
"type": "int"}]}
+ }},
+ {"name": "field_two", "type": {"type": "array", "items": "ns.Rec"}}
+ ]
+ }"#;
+
+ let union_usage = r#"{
+ "name": "UnionUsage",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": ["null", "ns.Rec"]},
+ {"name": "field_two", "type": ["null", "ns.Rec"]}
+ ]
+ }"#;
+ let union_usage_independent = r#"{
+ "name": "UnionUsage",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": ["null", {
+ "name": "ns.Rec", "type": "record","fields": [{"name": "v",
"type": "int"}]}
+ ]},
+ {"name": "field_two", "type": ["null", "ns.Rec"]}
+ ]
+ }"#;
+
+ let map_usage = r#"{
+ "name": "MapUsage",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": {"type": "map", "values": "ns.Rec"}},
+ {"name": "field_two", "type": {"type": "map", "values": "ns.Rec"}}
+ ]
+ }"#;
+ let map_usage_independent = r#"{
+ "name": "MapUsage",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": {"type": "map", "values": {
+ "name": "ns.Rec", "type": "record","fields": [{"name": "v",
"type": "int"}]}
+ }},
+ {"name": "field_two", "type": {"type": "map", "values": "ns.Rec"}}
+ ]
+ }"#;
+
+ let schema_strs = [
+ record_primitive,
+ record_usage,
+ array_usage,
+ map_usage,
+ union_usage,
+ ];
+
+ for schema_str_perm in permutations(&schema_strs) {
+ let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s|
**s).collect();
+ let schemata = Schema::parse_list(&schema_str_perm)?;
+ for schema in &schemata {
+ match schema.name().unwrap().to_string().as_str() {
+ "RecUsage" => {
+ assert_eq!(
+ schema.independent_canonical_form(&schemata)?,
+
Schema::parse_str(record_usage_independent)?.canonical_form()
+ );
+ }
+ "ArrayUsage" => {
+ assert_eq!(
+ schema.independent_canonical_form(&schemata)?,
+
Schema::parse_str(array_usage_independent)?.canonical_form()
+ );
+ }
+ "UnionUsage" => {
+ assert_eq!(
+ schema.independent_canonical_form(&schemata)?,
+
Schema::parse_str(union_usage_independent)?.canonical_form()
+ );
+ }
+ "MapUsage" => {
+ assert_eq!(
+ schema.independent_canonical_form(&schemata)?,
+
Schema::parse_str(map_usage_independent)?.canonical_form()
+ );
+ }
+ "ns.Rec" => {
+ assert_eq!(
+ schema.independent_canonical_form(&schemata)?,
+ schema.canonical_form()
+ );
+ }
+ other => unreachable!("Unknown schema name: {}", other),
+ }
+ }
+ }
+ Ok(())
+}
+
+#[test]
+fn avro_rs_66_test_independent_canonical_form_deep_recursion() -> TestResult {
+ init();
+ let record_primitive = r#"{
+ "name": "Rec",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {"name": "v", "type": "int"}
+ ]
+ }"#;
+
+ let record_usage = r#"{
+ "name": "RecUsage",
+ "type": "record",
+ "fields": [
+ {"name": "v1", "type": "ns.Rec"},
+ {"name": "v2", "type": "ns.Rec"}
+ ]
+ }"#;
+
+ let record_usage_usage = r#"{
+ "name": "RecUsageUsage",
+ "type": "record",
+ "fields": [
+ {"name": "r1", "type": "RecUsage"},
+ {"name": "r2", "type": "RecUsage"}
+ ]
+ }"#;
+
+ let record_usage_usage_independent = r#"{
+ "name": "RecUsageUsage",
+ "type": "record",
+ "fields": [
+ {"name": "r1", "type": {
+ "name": "RecUsage",
+ "type": "record",
+ "fields": [
+ {
+ "name": "v1", "type": {
+ "name": "ns.Rec", "type": "record","fields":
[{"name": "v", "type": "int"}]
+ }
+ },
+ {"name": "v2", "type": "ns.Rec"}
+ ]
+ }},
+ {"name": "r2", "type": "RecUsage"}
+ ]
+
+ }"#;
+
+ let schema_strs = [record_primitive, record_usage, record_usage_usage];
+
+ for schema_str_perm in permutations(&schema_strs) {
+ let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s|
**s).collect();
+ let schemata = Schema::parse_list(&schema_str_perm)?;
+ let ruu = schemata
+ .iter()
+ .find(|s| s.name().unwrap().to_string().as_str() ==
"RecUsageUsage")
+ .unwrap();
+ assert_eq!(
+ ruu.independent_canonical_form(&schemata)?,
+ Schema::parse_str(record_usage_usage_independent)?.canonical_form()
+ );
+ }
+ Ok(())
+}
+
+#[test]
+fn avro_rs_66_test_independent_canonical_form_missing_ref() -> TestResult {
+ init();
+ let record_primitive = r#"{
+ "name": "Rec",
+ "namespace": "ns",
+ "type": "record",
+ "fields": [
+ {"name": "v", "type": "int"}
+ ]
+ }"#;
+
+ let record_usage = r#"{
+ "name": "RecUsage",
+ "type": "record",
+ "fields": [
+ {"name": "v1", "type": "ns.Rec"}
+ ]
+ }"#;
+
+ let schema_strs = [record_primitive, record_usage];
+ let schemata = Schema::parse_list(&schema_strs)?;
+ assert!(matches!(
+ schemata[1].independent_canonical_form(&Vec::with_capacity(0)), //NOTE
- we're passing in an empty schemata
+ Err(Error::SchemaResolutionError(..))
+ ));
+ Ok(())
+}