This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2cf2b29 feat(parquet/pqarrow): Correctly handle Variant types in
schema (#433)
2cf2b29 is described below
commit 2cf2b297672f37895c83a42bfaa1ac08352efc16
Author: Matt Topol <[email protected]>
AuthorDate: Tue Jul 8 13:36:15 2025 -0400
feat(parquet/pqarrow): Correctly handle Variant types in schema (#433)
### Rationale for this change
Updating the `pqarrow` package to handle the variant extension type when
converting between arrow and parquet schemas.
### What changes are included in this PR?
Replacing the TODOs with implementations to handle shredded variant
structures in schema conversion.
### Are these changes tested?
A unit test is added for shredded variant handling.
### Are there any user-facing changes?
Only that this is now supported instead of erroring.
---
arrow/extensions/variant.go | 8 ++++
parquet/pqarrow/schema.go | 22 ++++++---
parquet/pqarrow/schema_test.go | 104 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 127 insertions(+), 7 deletions(-)
diff --git a/arrow/extensions/variant.go b/arrow/extensions/variant.go
index 7098b77..fbef4a6 100644
--- a/arrow/extensions/variant.go
+++ b/arrow/extensions/variant.go
@@ -168,6 +168,14 @@ func (v *VariantType) Value() arrow.Field {
return v.StorageType().(*arrow.StructType).Field(v.valueFieldIdx)
}
+func (v *VariantType) TypedValue() arrow.Field {
+ if v.typedValueFieldIdx == -1 {
+ return arrow.Field{}
+ }
+
+ return v.StorageType().(*arrow.StructType).Field(v.typedValueFieldIdx)
+}
+
func (*VariantType) ExtensionName() string { return "parquet.variant" }
func (v *VariantType) String() string {
diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go
index 34e4cc6..17603b9 100644
--- a/parquet/pqarrow/schema.go
+++ b/parquet/pqarrow/schema.go
@@ -253,11 +253,19 @@ func variantToNode(t *extensions.VariantType, field
arrow.Field, props *parquet.
return nil, err
}
- //TODO: implement shredding
+ fields := schema.FieldList{metadataNode, valueNode}
+
+ typedField := t.TypedValue()
+ if typedField.Type != nil {
+ typedNode, err := fieldToNode("typed_value", typedField, props,
arrProps)
+ if err != nil {
+ return nil, err
+ }
+ fields = append(fields, typedNode)
+ }
return schema.NewGroupNodeLogical(field.Name,
repFromNullable(field.Nullable),
- schema.FieldList{metadataNode, valueNode},
schema.VariantLogicalType{},
- fieldIDFromMeta(field.Metadata))
+ fields, schema.VariantLogicalType{},
fieldIDFromMeta(field.Metadata))
}
func structToNode(field arrow.Field, props *parquet.WriterProperties, arrprops
ArrowWriterProperties) (schema.Node, error) {
@@ -857,10 +865,10 @@ func mapToSchemaField(n *schema.GroupNode, currentLevels
file.LevelInfo, ctx *sc
}
func variantToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo,
ctx *schemaTree, _, out *SchemaField) error {
- // this is for unshredded variants. shredded variants may have more
fields
- // TODO: implement support for shredded variants
- if n.NumFields() != 2 {
- return errors.New("VARIANT group must have exactly 2 children")
+ switch n.NumFields() {
+ case 2, 3:
+ default:
+ return errors.New("VARIANT group must have exactly 2 or 3
children")
}
var err error
diff --git a/parquet/pqarrow/schema_test.go b/parquet/pqarrow/schema_test.go
index 58475dc..6f3da88 100644
--- a/parquet/pqarrow/schema_test.go
+++ b/parquet/pqarrow/schema_test.go
@@ -534,3 +534,107 @@ func TestConvertSchemaParquetVariant(t *testing.T) {
require.NoError(t, err)
assert.True(t, pqschema.Equals(sc), pqschema.String(), sc.String())
}
+
+func TestShreddedVariantSchema(t *testing.T) {
+ metaNoFieldID := arrow.NewMetadata([]string{"PARQUET:field_id"},
[]string{"-1"})
+
+ s := arrow.StructOf(
+ arrow.Field{Name: "metadata", Type: arrow.BinaryTypes.Binary,
Metadata: metaNoFieldID},
+ arrow.Field{Name: "value", Type: arrow.BinaryTypes.Binary,
Nullable: true, Metadata: metaNoFieldID},
+ arrow.Field{Name: "typed_value", Type: arrow.StructOf(
+ arrow.Field{Name: "tsmicro", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Timestamp_us, Nullable: true, Metadata: metaNoFieldID},
+ ), Metadata: metaNoFieldID},
+ arrow.Field{Name: "strval", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
+ arrow.Field{Name: "typed_value", Type:
arrow.BinaryTypes.String, Nullable: true, Metadata: metaNoFieldID},
+ ), Metadata: metaNoFieldID},
+ arrow.Field{Name: "bool", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
+ arrow.Field{Name: "typed_value", Type:
arrow.FixedWidthTypes.Boolean, Nullable: true, Metadata: metaNoFieldID},
+ ), Metadata: metaNoFieldID},
+ arrow.Field{Name: "uuid", Type: arrow.StructOf(
+ arrow.Field{Name: "value", Type:
arrow.BinaryTypes.Binary, Nullable: true, Metadata: metaNoFieldID},
+ arrow.Field{Name: "typed_value", Type:
extensions.NewUUIDType(), Nullable: true, Metadata: metaNoFieldID},
+ ), Metadata: metaNoFieldID},
+ ), Nullable: true, Metadata: metaNoFieldID})
+
+ vt, err := extensions.NewVariantType(s)
+ require.NoError(t, err)
+
+ arrSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "variant_col", Type: vt, Nullable: true, Metadata:
metaNoFieldID},
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false,
Metadata: metaNoFieldID},
+ }, nil)
+
+ sc, err := pqarrow.ToParquet(arrSchema, nil,
pqarrow.DefaultWriterProps())
+ require.NoError(t, err)
+
+ // the equivalent shredded variant parquet schema looks like this:
+ // repeated group field_id=-1 schema {
+ // optional group field_id=-1 variant_col (Variant) {
+ // required byte_array field_id=-1 metadata;
+ // optional byte_array field_id=-1 value;
+ // optional group field_id=-1 typed_value {
+ // required group field_id=-1 tsmicro {
+ // optional byte_array field_id=-1 value;
+ // optional int64 field_id=-1 typed_value
(Timestamp(isAdjustedToUTC=true, timeUnit=microseconds,
is_from_converted_type=false, force_set_converted_type=true));
+ // }
+ // required group field_id=-1 strval {
+ // optional byte_array field_id=-1 value;
+ // optional byte_array field_id=-1 typed_value (String);
+ // }
+ // required group field_id=-1 bool {
+ // optional byte_array field_id=-1 value;
+ // optional boolean field_id=-1 typed_value;
+ // }
+ // required group field_id=-1 uuid {
+ // optional byte_array field_id=-1 value;
+ // optional fixed_len_byte_array field_id=-1 typed_value (UUID);
+ // }
+ // }
+ // }
+ // required int64 field_id=-1 id (Int(bitWidth=64, isSigned=true));
+ // }
+
+ expected :=
schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema",
+ parquet.Repetitions.Repeated, schema.FieldList{
+ schema.Must(schema.NewGroupNodeLogical("variant_col",
parquet.Repetitions.Optional, schema.FieldList{
+
schema.MustPrimitive(schema.NewPrimitiveNode("metadata",
parquet.Repetitions.Required, parquet.Types.ByteArray, -1, -1)),
+
schema.MustPrimitive(schema.NewPrimitiveNode("value",
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
+
schema.MustGroup(schema.NewGroupNode("typed_value",
parquet.Repetitions.Optional, schema.FieldList{
+
schema.MustGroup(schema.NewGroupNode("tsmicro", parquet.Repetitions.Required,
schema.FieldList{
+
schema.MustPrimitive(schema.NewPrimitiveNode("value",
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
+
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value",
parquet.Repetitions.Optional, schema.NewTimestampLogicalTypeWithOpts(
+
schema.WithTSTimeUnitType(schema.TimeUnitMicros),
schema.WithTSIsAdjustedToUTC(), schema.WithTSForceConverted(),
+ ), parquet.Types.Int64, -1,
-1)),
+ }, -1)),
+
schema.MustGroup(schema.NewGroupNode("strval", parquet.Repetitions.Required,
schema.FieldList{
+
schema.MustPrimitive(schema.NewPrimitiveNode("value",
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
+
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value",
parquet.Repetitions.Optional,
+
schema.StringLogicalType{}, parquet.Types.ByteArray, -1, -1)),
+ }, -1)),
+
schema.MustGroup(schema.NewGroupNode("bool", parquet.Repetitions.Required,
schema.FieldList{
+
schema.MustPrimitive(schema.NewPrimitiveNode("value",
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
+
schema.MustPrimitive(schema.NewPrimitiveNode("typed_value",
parquet.Repetitions.Optional,
+ parquet.Types.Boolean,
-1, -1)),
+ }, -1)),
+
schema.MustGroup(schema.NewGroupNode("uuid", parquet.Repetitions.Required,
schema.FieldList{
+
schema.MustPrimitive(schema.NewPrimitiveNode("value",
parquet.Repetitions.Optional, parquet.Types.ByteArray, -1, -1)),
+
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("typed_value",
parquet.Repetitions.Optional,
+
schema.UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 16, -1)),
+ }, -1)),
+ }, -1)),
+ }, schema.VariantLogicalType{}, -1)),
+
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("id",
parquet.Repetitions.Required,
+ schema.NewIntLogicalType(64, true),
parquet.Types.Int64, -1, -1)),
+ }, -1)))
+
+ assert.True(t, sc.Equals(expected), "expected: %s\ngot: %s", expected,
sc)
+
+ arrsc, err := pqarrow.FromParquet(sc, nil, metadata.KeyValueMetadata{})
+ require.NoError(t, err)
+
+ assert.True(t, arrSchema.Equal(arrsc), "expected: %s\ngot: %s",
arrSchema, arrsc)
+}