This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new b196d3b fix(arrow/avro-reader): bunch of types that didn't work (#416)
b196d3b is described below
commit b196d3b316d09f63786f021d4f1baa1fdd7620d2
Author: Willem Jan <[email protected]>
AuthorDate: Tue Jul 8 20:00:32 2025 +0200
fix(arrow/avro-reader): bunch of types that didn't work (#416)
### Rationale for this change
#415
### What changes are included in this PR?
Fix decoding of avro types that no longer worked: fixed bytes,
timestamp, time, date & decimals
### Are these changes tested?
Added tests to the avro/reader_test.go
### Are there any user-facing changes?
No API changes, just broader compatability.
---------
Co-authored-by: Willem Jan Noort <[email protected]>
---
arrow/avro/reader_test.go | 253 +++++++--------------------------
arrow/avro/reader_types.go | 63 ++++++++-
arrow/avro/schema_test.go | 203 +--------------------------
arrow/avro/testdata/alltypes.avsc | 205 +++++++++++++++++++++++++++
arrow/avro/testdata/testdata.go | 287 ++++++++++++++++++++++++++++++++++++++
dev/release/rat_exclude_files.txt | 1 +
6 files changed, 612 insertions(+), 400 deletions(-)
diff --git a/arrow/avro/reader_test.go b/arrow/avro/reader_test.go
index 1b07fbb..d87948a 100644
--- a/arrow/avro/reader_test.go
+++ b/arrow/avro/reader_test.go
@@ -17,212 +17,24 @@
package avro
import (
+ "bytes"
+ "encoding/json"
"fmt"
+ "os"
+ "path/filepath"
"testing"
"github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/avro/testdata"
hamba "github.com/hamba/avro/v2"
+ "github.com/stretchr/testify/assert"
)
-func TestEditSchemaStringEqual(t *testing.T) {
+func TestReader(t *testing.T) {
tests := []struct {
- avroSchema string
arrowSchema []arrow.Field
}{
{
- avroSchema: `{
- "fields": [
- {
- "name": "inheritNull",
- "type": {
- "name": "Simple",
- "symbols": [
- "a",
- "b"
- ],
- "type": "enum"
- }
- },
- {
- "name": "explicitNamespace",
- "type": {
- "name": "test",
- "namespace":
"org.hamba.avro",
- "size": 12,
- "type": "fixed"
- }
- },
- {
- "name": "fullName",
- "type": {
- "type": "record",
- "name": "fullName_data",
- "namespace": "ignored",
- "doc": "A name
attribute with a fullname, so the namespace attribute is ignored. The fullname
is 'a.full.Name', and the namespace is 'a.full'.",
- "fields": [{
- "name":
"inheritNamespace",
- "type":
{
-
"type": "enum",
-
"name": "Understanding",
-
"doc": "A simple name (attribute) and no namespace attribute: inherit the
namespace of the enclosing type 'a.full.Name'. The fullname is
'a.full.Understanding'.",
-
"symbols": ["d", "e"]
- }
- }, {
- "name":
"md5",
- "type":
{
- "name": "md5_data",
- "type": "fixed",
-
"size": 16,
-
"namespace": "ignored"
- }
- }
- ]
- }
- },
- {
- "name": "id",
- "type": "int"
- },
- {
- "name": "bigId",
- "type": "long"
- },
- {
- "name": "temperature",
- "type": [
- "null",
- "float"
- ]
- },
- {
- "name": "fraction",
- "type": [
- "null",
- "double"
- ]
- },
- {
- "name": "is_emergency",
- "type": "boolean"
- },
- {
- "name": "remote_ip",
- "type": [
- "null",
- "bytes"
- ]
- },
- {
- "name": "person",
- "type": {
- "fields": [
- {
- "name":
"lastname",
- "type":
"string"
- },
- {
- "name":
"address",
- "type":
{
-
"fields": [
-
{
-
"name": "streetaddress",
-
"type": "string"
-
},
-
{
-
"name": "city",
-
"type": "string"
-
}
-
],
-
"name": "AddressUSRecord",
-
"type": "record"
- }
- },
- {
- "name":
"mapfield",
- "type":
{
-
"default": {
-
},
-
"type": "map",
-
"values": "long"
- }
- },
- {
- "name":
"arrayField",
- "type":
{
-
"default": [
-
],
-
"items": "string",
-
"type": "array"
- }
- }
- ],
- "name": "person_data",
- "type": "record"
- }
- },
- {
- "name": "decimalField",
- "type": {
- "logicalType":
"decimal",
- "precision": 4,
- "scale": 2,
- "type": "bytes"
- }
- },
- {
- "logicalType": "uuid",
- "name": "uuidField",
- "type": "string"
- },
- {
- "name": "timemillis",
- "type": {
- "type": "int",
- "logicalType":
"time-millis"
- }
- },
- {
- "name": "timemicros",
- "type": {
- "type": "long",
- "logicalType":
"time-micros"
- }
- },
- {
- "name": "timestampmillis",
- "type": {
- "type": "long",
- "logicalType":
"timestamp-millis"
- }
- },
- {
- "name": "timestampmicros",
- "type": {
- "type": "long",
- "logicalType":
"timestamp-micros"
- }
- },
- {
- "name": "duration",
- "type": {
- "name": "duration",
- "namespace": "whyowhy",
- "logicalType":
"duration",
- "size": 12,
- "type": "fixed"
- }
- },
- {
- "name": "date",
- "type": {
- "logicalType": "date",
- "type": "int"
- }
- }
- ],
- "name": "Example",
- "type": "record"
- }`,
arrowSchema: []arrow.Field{
{
Name: "explicitNamespace",
@@ -303,6 +115,10 @@ func TestEditSchemaStringEqual(t *testing.T) {
Name: "decimalField",
Type: &arrow.Decimal128Type{Precision:
4, Scale: 2},
},
+ {
+ Name: "decimal256Field",
+ Type: &arrow.Decimal256Type{Precision:
60, Scale: 2},
+ },
{
Name: "uuidField",
Type: arrow.BinaryTypes.String,
@@ -336,12 +152,15 @@ func TestEditSchemaStringEqual(t *testing.T) {
}
for _, test := range tests {
- t.Run("", func(t *testing.T) {
+ tp := testdata.Generate()
+ defer os.RemoveAll(filepath.Dir(tp.Avro))
+
+ t.Run("ShouldParseSchemaWithEdits", func(t *testing.T) {
want := arrow.NewSchema(test.arrowSchema, nil)
- schema, err := hamba.ParseBytes([]byte(test.avroSchema))
+ schema, err := testdata.AllTypesAvroSchema()
if err != nil {
- t.Fatalf("%v", err)
+ t.Fatal(err)
}
r := new(OCFReader)
r.avroSchema = schema.String()
@@ -354,11 +173,45 @@ func TestEditSchemaStringEqual(t *testing.T) {
if err != nil {
t.Fatalf("%v", err)
}
+ assert.Equal(t, want.String(), got.String())
if fmt.Sprintf("%+v", want.String()) !=
fmt.Sprintf("%+v", got.String()) {
t.Fatalf("got=%v,\n want=%v", got.String(),
want.String())
- } else {
- t.Logf("schema.String() comparison passed")
}
})
+
+ t.Run("ShouldLoadExpectedRecords", func(t *testing.T) {
+ b, err := os.ReadFile(tp.Avro)
+ if err != nil {
+ t.Error(err)
+ }
+ r := bytes.NewReader(b)
+
+ opts := []Option{WithChunk(-1)}
+ ar, err := NewOCFReader(r, opts...)
+ if err != nil {
+ t.Error(err)
+ }
+ defer ar.Close()
+
+ exists := ar.Next()
+
+ if ar.Err() != nil {
+ t.Error("failed to read next record: %w",
ar.Err())
+ }
+ if !exists {
+ t.Error("no record exists")
+ }
+ a, err := ar.Record().MarshalJSON()
+ assert.NoError(t, err)
+ var avroParsed []map[string]any
+ json.Unmarshal(a, &avroParsed)
+
+ j, err := os.ReadFile(tp.Json)
+ assert.NoError(t, err)
+ var jsonParsed map[string]any
+ json.Unmarshal(j, &jsonParsed)
+
+ assert.Equal(t, jsonParsed, avroParsed[0])
+ })
}
}
diff --git a/arrow/avro/reader_types.go b/arrow/avro/reader_types.go
index 50f0b18..ff21b5a 100644
--- a/arrow/avro/reader_types.go
+++ b/arrow/avro/reader_types.go
@@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"math/big"
+ "reflect"
+ "time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
@@ -29,6 +31,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/decimal256"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/memory"
+ hamba "github.com/hamba/avro/v2"
)
type dataLoader struct {
@@ -422,7 +425,11 @@ func mapFieldBuilders(b array.Builder, field arrow.Field,
parent *fieldPos) {
}
case *array.Decimal128Builder:
f.appendFunc = func(data interface{}) error {
- err := appendDecimal128Data(bt, data)
+ typ, ok := field.Type.(arrow.DecimalType)
+ if !ok {
+ return nil
+ }
+ err := appendDecimal128Data(bt, data, typ)
if err != nil {
return err
}
@@ -430,7 +437,11 @@ func mapFieldBuilders(b array.Builder, field arrow.Field,
parent *fieldPos) {
}
case *array.Decimal256Builder:
f.appendFunc = func(data interface{}) error {
- err := appendDecimal256Data(bt, data)
+ typ, ok := field.Type.(arrow.DecimalType)
+ if !ok {
+ return nil
+ }
+ err := appendDecimal256Data(bt, data, typ)
if err != nil {
return err
}
@@ -640,10 +651,12 @@ func appendDate32Data(b *array.Date32Builder, data
interface{}) {
case int32:
b.Append(arrow.Date32(v))
}
+ case time.Time:
+ b.Append(arrow.Date32FromTime(dt))
}
}
-func appendDecimal128Data(b *array.Decimal128Builder, data interface{}) error {
+func appendDecimal128Data(b *array.Decimal128Builder, data interface{}, typ
arrow.DecimalType) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
@@ -673,11 +686,19 @@ func appendDecimal128Data(b *array.Decimal128Builder,
data interface{}) error {
var bigIntData big.Int
b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
}
+ case *big.Rat:
+ v := bigRatToBigInt(dt, typ)
+
+ if v.IsInt64() {
+ b.Append(decimal128.FromI64(v.Int64()))
+ } else {
+ b.Append(decimal128.FromBigInt(v))
+ }
}
return nil
}
-func appendDecimal256Data(b *array.Decimal256Builder, data interface{}) error {
+func appendDecimal256Data(b *array.Decimal256Builder, data interface{}, typ
arrow.DecimalType) error {
switch dt := data.(type) {
case nil:
b.AppendNull()
@@ -689,10 +710,21 @@ func appendDecimal256Data(b *array.Decimal256Builder,
data interface{}) error {
var bigIntData big.Int
buf := bytes.NewBuffer(dt["bytes"].([]byte))
b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes())))
+ case *big.Rat:
+ b.Append(decimal256.FromBigInt(bigRatToBigInt(dt, typ)))
}
return nil
}
+func bigRatToBigInt(dt *big.Rat, typ arrow.DecimalType) *big.Int {
+ scale := big.NewInt(int64(typ.GetScale()))
+ scaledNum := new(big.Int).Set(dt.Num())
+ scaleFactor := new(big.Int).Exp(big.NewInt(10), scale, nil)
+ scaledNum.Mul(scaledNum, scaleFactor)
+ scaledNum.Quo(scaledNum, dt.Denom())
+ return scaledNum
+}
+
// Avro duration logical type annotates Avro fixed type of size 12, which
stores three little-endian
// unsigned integers that represent durations at different granularities of
time. The first stores
// a number in months, the second stores a number in days, and the third
stores a number in milliseconds.
@@ -717,6 +749,12 @@ func appendDurationData(b
*array.MonthDayNanoIntervalBuilder, data interface{})
dur.Nanoseconds =
int64(binary.LittleEndian.Uint32(dtb[8:]) * 1000000)
b.Append(*dur)
}
+ case hamba.LogicalDuration:
+ b.Append(arrow.MonthDayNanoInterval{
+ Months: int32(dt.Months),
+ Days: int32(dt.Days),
+ Nanoseconds: int64(dt.Milliseconds) *
int64(time.Millisecond),
+ })
}
}
@@ -733,6 +771,13 @@ func appendFixedSizeBinaryData(b
*array.FixedSizeBinaryBuilder, data interface{}
case []byte:
b.Append(v)
}
+ default:
+ v := reflect.ValueOf(data)
+ if v.Kind() == reflect.Array && v.Type().Elem().Kind() ==
reflect.Uint8 {
+ bytes := make([]byte, v.Len())
+ reflect.Copy(reflect.ValueOf(bytes), v)
+ b.Append(bytes)
+ }
}
}
@@ -839,6 +884,8 @@ func appendTime32Data(b *array.Time32Builder, data
interface{}) {
case int32:
b.Append(arrow.Time32(v))
}
+ case time.Duration:
+ b.Append(arrow.Time32(dt.Milliseconds()))
}
}
@@ -855,6 +902,8 @@ func appendTime64Data(b *array.Time64Builder, data
interface{}) {
case int64:
b.Append(arrow.Time64(v))
}
+ case time.Duration:
+ b.Append(arrow.Time64(dt.Microseconds()))
}
}
@@ -871,5 +920,11 @@ func appendTimestampData(b *array.TimestampBuilder, data
interface{}) {
case int64:
b.Append(arrow.Timestamp(v))
}
+ case time.Time:
+ v, err := arrow.TimestampFromTime(dt,
b.Type().(*arrow.TimestampType).Unit)
+ if err != nil {
+ panic(err)
+ }
+ b.Append(v)
}
}
diff --git a/arrow/avro/schema_test.go b/arrow/avro/schema_test.go
index 385bd4b..d813da0 100644
--- a/arrow/avro/schema_test.go
+++ b/arrow/avro/schema_test.go
@@ -21,208 +21,14 @@ import (
"testing"
"github.com/apache/arrow-go/v18/arrow"
- hamba "github.com/hamba/avro/v2"
+ "github.com/apache/arrow-go/v18/arrow/avro/testdata"
)
func TestSchemaStringEqual(t *testing.T) {
tests := []struct {
- avroSchema string
arrowSchema []arrow.Field
}{
{
- avroSchema: `{
- "fields": [
- {
- "name": "inheritNull",
- "type": {
- "name": "Simple",
- "symbols": [
- "a",
- "b"
- ],
- "type": "enum"
- }
- },
- {
- "name": "explicitNamespace",
- "type": {
- "name": "test",
- "namespace":
"org.hamba.avro",
- "size": 12,
- "type": "fixed"
- }
- },
- {
- "name": "fullName",
- "type": {
- "type": "record",
- "name": "fullName_data",
- "namespace": "ignored",
- "doc": "A name
attribute with a fullname, so the namespace attribute is ignored. The fullname
is 'a.full.Name', and the namespace is 'a.full'.",
- "fields": [{
- "name":
"inheritNamespace",
- "type":
{
-
"type": "enum",
-
"name": "Understanding",
-
"doc": "A simple name (attribute) and no namespace attribute: inherit the
namespace of the enclosing type 'a.full.Name'. The fullname is
'a.full.Understanding'.",
-
"symbols": ["d", "e"]
- }
- }, {
- "name":
"md5",
- "type":
{
- "name": "md5_data",
- "type": "fixed",
-
"size": 16,
-
"namespace": "ignored"
- }
- }
- ]
- }
- },
- {
- "name": "id",
- "type": "int"
- },
- {
- "name": "bigId",
- "type": "long"
- },
- {
- "name": "temperature",
- "type": [
- "null",
- "float"
- ]
- },
- {
- "name": "fraction",
- "type": [
- "null",
- "double"
- ]
- },
- {
- "name": "is_emergency",
- "type": "boolean"
- },
- {
- "name": "remote_ip",
- "type": [
- "null",
- "bytes"
- ]
- },
- {
- "name": "person",
- "type": {
- "fields": [
- {
- "name":
"lastname",
- "type":
"string"
- },
- {
- "name":
"address",
- "type":
{
-
"fields": [
-
{
-
"name": "streetaddress",
-
"type": "string"
-
},
-
{
-
"name": "city",
-
"type": "string"
-
}
-
],
-
"name": "AddressUSRecord",
-
"type": "record"
- }
- },
- {
- "name":
"mapfield",
- "type":
{
-
"default": {
-
},
-
"type": "map",
-
"values": "long"
- }
- },
- {
- "name":
"arrayField",
- "type":
{
-
"default": [
-
],
-
"items": "string",
-
"type": "array"
- }
- }
- ],
- "name": "person_data",
- "type": "record"
- }
- },
- {
- "name": "decimalField",
- "type": {
- "logicalType":
"decimal",
- "precision": 4,
- "scale": 2,
- "type": "bytes"
- }
- },
- {
- "logicalType": "uuid",
- "name": "uuidField",
- "type": "string"
- },
- {
- "name": "timemillis",
- "type": {
- "type": "int",
- "logicalType":
"time-millis"
- }
- },
- {
- "name": "timemicros",
- "type": {
- "type": "long",
- "logicalType":
"time-micros"
- }
- },
- {
- "name": "timestampmillis",
- "type": {
- "type": "long",
- "logicalType":
"timestamp-millis"
- }
- },
- {
- "name": "timestampmicros",
- "type": {
- "type": "long",
- "logicalType":
"timestamp-micros"
- }
- },
- {
- "name": "duration",
- "type": {
- "name": "duration",
- "namespace": "whyowhy",
- "logicalType":
"duration",
- "size": 12,
- "type": "fixed"
- }
- },
- {
- "name": "date",
- "type": {
- "logicalType": "date",
- "type": "int"
- }
- }
- ],
- "name": "Example",
- "type": "record"
- }`,
arrowSchema: []arrow.Field{
{
Name: "inheritNull",
@@ -309,6 +115,10 @@ func TestSchemaStringEqual(t *testing.T) {
Name: "decimalField",
Type: &arrow.Decimal128Type{Precision:
4, Scale: 2},
},
+ {
+ Name: "decimal256Field",
+ Type: &arrow.Decimal256Type{Precision:
60, Scale: 2},
+ },
{
Name: "uuidField",
Type: arrow.BinaryTypes.String,
@@ -344,7 +154,8 @@ func TestSchemaStringEqual(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
want := arrow.NewSchema(test.arrowSchema, nil)
- schema, err := hamba.ParseBytes([]byte(test.avroSchema))
+
+ schema, err := testdata.AllTypesAvroSchema()
if err != nil {
t.Fatalf("%v", err)
}
diff --git a/arrow/avro/testdata/alltypes.avsc
b/arrow/avro/testdata/alltypes.avsc
new file mode 100644
index 0000000..a4e3037
--- /dev/null
+++ b/arrow/avro/testdata/alltypes.avsc
@@ -0,0 +1,205 @@
+{
+ "fields": [
+ {
+ "name": "inheritNull",
+ "type": {
+ "name": "Simple",
+ "symbols": [
+ "a",
+ "b"
+ ],
+ "type": "enum"
+ }
+ },
+ {
+ "name": "explicitNamespace",
+ "type": {
+ "name": "test",
+ "namespace": "org.hamba.avro",
+ "size": 12,
+ "type": "fixed"
+ }
+ },
+ {
+ "name": "fullName",
+ "type": {
+ "type": "record",
+ "name": "fullName_data",
+ "namespace": "ignored",
+ "doc": "A name attribute with a fullname, so the namespace attribute
is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.",
+ "fields": [
+ {
+ "name": "inheritNamespace",
+ "type": {
+ "type": "enum",
+ "name": "Understanding",
+ "doc": "A simple name (attribute) and no namespace attribute:
inherit the namespace of the enclosing type 'a.full.Name'. The fullname is
'a.full.Understanding'.",
+ "symbols": [
+ "d",
+ "e"
+ ]
+ }
+ },
+ {
+ "name": "md5",
+ "type": {
+ "name": "md5_data",
+ "type": "fixed",
+ "size": 16,
+ "namespace": "ignored"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "bigId",
+ "type": "long"
+ },
+ {
+ "name": "temperature",
+ "type": [
+ "null",
+ "float"
+ ]
+ },
+ {
+ "name": "fraction",
+ "type": [
+ "null",
+ "double"
+ ]
+ },
+ {
+ "name": "is_emergency",
+ "type": "boolean"
+ },
+ {
+ "name": "remote_ip",
+ "type": [
+ "null",
+ "bytes"
+ ]
+ },
+ {
+ "name": "person",
+ "type": {
+ "fields": [
+ {
+ "name": "lastname",
+ "type": "string"
+ },
+ {
+ "name": "address",
+ "type": {
+ "fields": [
+ {
+ "name": "streetaddress",
+ "type": "string"
+ },
+ {
+ "name": "city",
+ "type": "string"
+ }
+ ],
+ "name": "AddressUSRecord",
+ "type": "record"
+ }
+ },
+ {
+ "name": "mapfield",
+ "type": {
+ "default": {},
+ "type": "map",
+ "values": "long"
+ }
+ },
+ {
+ "name": "arrayField",
+ "type": {
+ "default": [],
+ "items": "string",
+ "type": "array"
+ }
+ }
+ ],
+ "name": "person_data",
+ "type": "record"
+ }
+ },
+ {
+ "name": "decimalField",
+ "type": {
+ "logicalType": "decimal",
+ "precision": 4,
+ "scale": 2,
+ "type": "bytes"
+ }
+ },
+ {
+ "name": "decimal256Field",
+ "type": {
+ "logicalType": "decimal",
+ "precision": 60,
+ "scale": 2,
+ "type": "bytes"
+ }
+ },
+ {
+ "logicalType": "uuid",
+ "name": "uuidField",
+ "type": "string"
+ },
+ {
+ "name": "timemillis",
+ "type": {
+ "type": "int",
+ "logicalType": "time-millis"
+ }
+ },
+ {
+ "name": "timemicros",
+ "type": {
+ "type": "long",
+ "logicalType": "time-micros"
+ }
+ },
+ {
+ "name": "timestampmillis",
+ "type": {
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ }
+ },
+ {
+ "name": "timestampmicros",
+ "type": {
+ "type": "long",
+ "logicalType": "timestamp-micros"
+ }
+ },
+ {
+ "name": "duration",
+ "type": {
+ "name": "duration",
+ "namespace": "whyowhy",
+ "logicalType": "duration",
+ "size": 12,
+ "type": "fixed"
+ }
+ },
+ {
+ "name": "date",
+ "type": {
+ "logicalType": "date",
+ "type": "int"
+ }
+ }
+ ],
+ "name": "Example",
+ "type": "record"
+}
diff --git a/arrow/avro/testdata/testdata.go b/arrow/avro/testdata/testdata.go
new file mode 100644
index 0000000..926c054
--- /dev/null
+++ b/arrow/avro/testdata/testdata.go
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package testdata
+
+import (
+ "encoding/base64"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "log"
+ "math/big"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ avro "github.com/hamba/avro/v2"
+ "github.com/hamba/avro/v2/ocf"
+)
+
+const (
+ SchemaFileName = "alltypes.avsc"
+ sampleAvroFileName = "alltypes.avro"
+ sampleJSONFileName = "alltypes.json"
+ decimalTypeScale = 2
+)
+
+type ByteArray []byte
+
+func (b ByteArray) MarshalJSON() ([]byte, error) {
+ s := fmt.Sprint(b)
+ encoded := base64.StdEncoding.EncodeToString([]byte(s))
+ return json.Marshal(encoded)
+}
+
+type TimestampMicros int64
+
+func (t TimestampMicros) MarshalJSON() ([]byte, error) {
+ ts := time.Unix(0,
int64(t)*int64(time.Microsecond)).UTC().Format("2006-01-02 15:04:05.000000")
+ // arrow record marshaller trims trailing zero digits from timestamp so
we do the same
+ return json.Marshal(fmt.Sprintf("%sZ", strings.TrimRight(ts, "0.")))
+}
+
+type TimestampMillis int64
+
+func (t TimestampMillis) MarshalJSON() ([]byte, error) {
+ ts := time.Unix(0,
int64(t)*int64(time.Millisecond)).UTC().Format("2006-01-02 15:04:05.000")
+ return json.Marshal(fmt.Sprintf("%sZ", strings.TrimRight(ts, "0.")))
+}
+
+type TimeMillis time.Duration
+
+func (t TimeMillis) MarshalJSON() ([]byte, error) {
+ ts := time.Unix(0, int64(t)).UTC().Format("15:04:05.000")
+ return json.Marshal(strings.TrimRight(ts, "0."))
+}
+
+type TimeMicros time.Duration
+
+func (t TimeMicros) MarshalJSON() ([]byte, error) {
+ ts := time.Unix(0, int64(t)).UTC().Format("15:04:05.000000")
+ return json.Marshal(strings.TrimRight(ts, "0."))
+}
+
+type ExplicitNamespace [12]byte
+
+func (t ExplicitNamespace) MarshalJSON() ([]byte, error) {
+ return json.Marshal(t[:])
+}
+
+type MD5 [16]byte
+
+func (t MD5) MarshalJSON() ([]byte, error) {
+ return json.Marshal(t[:])
+}
+
+type DecimalType []byte
+
+func (t DecimalType) MarshalJSON() ([]byte, error) {
+ v := new(big.Int).SetBytes(t)
+ s := fmt.Sprintf("%0*s", decimalTypeScale+1, v.String())
+ point := len(s) - decimalTypeScale
+ return json.Marshal(s[:point] + "." + s[point:])
+}
+
+type Duration [12]byte
+
+func (t Duration) MarshalJSON() ([]byte, error) {
+ milliseconds := int32(binary.LittleEndian.Uint32(t[8:12]))
+
+ m := map[string]interface{}{
+ "months": int32(binary.LittleEndian.Uint32(t[0:4])),
+ "days": int32(binary.LittleEndian.Uint32(t[4:8])),
+ "nanoseconds": int64(milliseconds) * int64(time.Millisecond),
+ }
+ return json.Marshal(m)
+}
+
+type Date int32
+
+func (t Date) MarshalJSON() ([]byte, error) {
+ v := time.Unix(int64(t)*86400, 0).UTC().Format("2006-01-02")
+ return json.Marshal(v)
+}
+
+type Example struct {
+ InheritNull string `avro:"inheritNull"
json:"inheritNull"`
+ ExplicitNamespace ExplicitNamespace `avro:"explicitNamespace"
json:"explicitNamespace"`
+ FullName FullNameData `avro:"fullName" json:"fullName"`
+ ID int32 `avro:"id" json:"id"`
+ BigID int64 `avro:"bigId" json:"bigId"`
+ Temperature *float32 `avro:"temperature"
json:"temperature"`
+ Fraction *float64 `avro:"fraction" json:"fraction"`
+ IsEmergency bool `avro:"is_emergency"
json:"is_emergency"`
+ RemoteIP *ByteArray `avro:"remote_ip" json:"remote_ip"`
+ Person PersonData `avro:"person" json:"person"`
+ DecimalField DecimalType `avro:"decimalField"
json:"decimalField"`
+ Decimal256Field DecimalType `avro:"decimal256Field"
json:"decimal256Field"`
+ UUIDField string `avro:"uuidField" json:"uuidField"`
+ TimeMillis TimeMillis `avro:"timemillis"
json:"timemillis"`
+ TimeMicros TimeMicros `avro:"timemicros"
json:"timemicros"`
+ TimestampMillis TimestampMillis `avro:"timestampmillis"
json:"timestampmillis"`
+ TimestampMicros TimestampMicros `avro:"timestampmicros"
json:"timestampmicros"`
+ Duration Duration `avro:"duration" json:"duration"`
+ Date Date `avro:"date" json:"date"`
+}
+
+type FullNameData struct {
+ InheritNamespace string `avro:"inheritNamespace"
json:"inheritNamespace"`
+ Md5 MD5 `avro:"md5" json:"md5"`
+}
+type MapField map[string]int64
+
+func (t MapField) MarshalJSON() ([]byte, error) {
+ arr := make([]map[string]any, 0, len(t))
+ for k, v := range t {
+ arr = append(arr, map[string]any{"key": k, "value": v})
+ }
+ return json.Marshal(arr)
+}
+
+type PersonData struct {
+ Lastname string `avro:"lastname" json:"lastname"`
+ Address AddressUSRecord `avro:"address" json:"address"`
+ Mapfield MapField `avro:"mapfield" json:"mapfield"`
+ ArrayField []string `avro:"arrayField" json:"arrayField"`
+}
+
+type AddressUSRecord struct {
+ Streetaddress string `avro:"streetaddress" json:"streetaddress"`
+ City string `avro:"city" json:"city"`
+}
+
+type TestPaths struct {
+ Avro string
+ Json string
+}
+
+func Generate() TestPaths {
+ td, err := os.MkdirTemp("", "arrow-avro-testdata-*")
+ if err != nil {
+ log.Fatalf("failed to create temp dir: %v", err)
+ }
+ data := sampleData()
+ return TestPaths{
+ Avro: writeOCFSampleData(td, data),
+ Json: writeJSONSampleData(td, data),
+ }
+}
+
+func TestdataDir() string {
+ cwd, err := os.Getwd()
+ if err != nil {
+ log.Fatalf("failed to get cwd: %v", err)
+ }
+ switch filepath.Base(cwd) {
+ case "arrow-go":
+ return filepath.Join("arrow", "avro", "testdata")
+ case "avro":
+ return "testdata"
+ case "testdata":
+ return "."
+ }
+ log.Fatalf("unexpected cwd: %s", cwd)
+ return ""
+}
+
+func AllTypesAvroSchema() (avro.Schema, error) {
+ sp := filepath.Join(TestdataDir(), SchemaFileName)
+ avroSchemaBytes, err := os.ReadFile(sp)
+ if err != nil {
+ return nil, err
+ }
+ return avro.ParseBytes(avroSchemaBytes)
+}
+
+func sampleData() Example {
+ return Example{
+ InheritNull: "a",
+ ExplicitNamespace: ExplicitNamespace{1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12},
+ FullName: FullNameData{
+ InheritNamespace: "d",
+ Md5: MD5{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15},
+ },
+ ID: 42,
+ BigID: 42000000000,
+ Temperature: func() *float32 { v := float32(36.6); return &v
}(),
+ Fraction: func() *float64 { v := float64(0.75); return &v
}(),
+ IsEmergency: true,
+ RemoteIP: func() *ByteArray { v := ByteArray{192, 168, 1,
1}; return &v }(),
+ Person: PersonData{
+ Lastname: "Doe",
+ Address: AddressUSRecord{
+ Streetaddress: "123 Main St",
+ City: "Metropolis",
+ },
+ Mapfield: MapField{"foo": 123},
+ ArrayField: []string{"one", "two"},
+ },
+ DecimalField: DecimalType{0x00, 0x00, 0x00, 0x00, 0x00, 0x26,
0x94},
+ Decimal256Field: DecimalType{
+ 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0,
+ 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88,
+ 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x01,
+ },
+ UUIDField: "123e4567-e89b-12d3-a456-426614174000",
+ TimeMillis: TimeMillis(50412345 * time.Millisecond),
+ TimeMicros: TimeMicros(50412345678 * time.Microsecond),
+ TimestampMillis: TimestampMillis(time.Now().UnixNano() /
int64(time.Millisecond)),
+ TimestampMicros: TimestampMicros(time.Now().UnixNano() /
int64(time.Microsecond)),
+ Duration: Duration{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12},
+ Date: Date(time.Now().Unix() / 86400),
+ }
+}
+
+func writeOCFSampleData(td string, data Example) string {
+ path := filepath.Join(td, sampleAvroFileName)
+ ocfFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
0644)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer ocfFile.Close()
+ schema, err := AllTypesAvroSchema()
+ if err != nil {
+ log.Fatal(err)
+ }
+ encoder, err := ocf.NewEncoder(schema.String(), ocfFile)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer encoder.Close()
+
+ err = encoder.Encode(data)
+ if err != nil {
+ log.Fatal(err)
+ }
+ return path
+}
+
+func writeJSONSampleData(td string, data Example) string {
+ path := filepath.Join(td, sampleJSONFileName)
+ jsonFile, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC,
0644)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer jsonFile.Close()
+ enc := json.NewEncoder(jsonFile)
+ err = enc.Encode(data)
+ if err != nil {
+ log.Fatal(err)
+ }
+ return path
+}
diff --git a/dev/release/rat_exclude_files.txt
b/dev/release/rat_exclude_files.txt
index adad69e..76c9c81 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -21,6 +21,7 @@ go.sum
.github/pull_request_template.md
+arrow/avro/testdata/alltypes.avsc
arrow/flight/gen/flight/*.pb.go
arrow/type_string.go
arrow/unionmode_string.go
