zeroshade commented on code in PR #524:
URL: https://github.com/apache/iceberg-go/pull/524#discussion_r2334793995


##########
exprs.go:
##########
@@ -492,6 +493,63 @@ func (b *boundRef[T]) eval(st structLike) Optional[T] {
        case T:
                return Optional[T]{Valid: true, Val: v}
        default:
+               if t, ok := v.(time.Time); ok {
+                       switch b.field.Type.(type) {
+                       case DateType:
+                               days := int32(t.Truncate(24*time.Hour).Unix() / 
int64((time.Hour * 24).Seconds()))
+                               if converted, ok := any(Date(days)).(T); ok {
+                                       return Optional[T]{Valid: true, Val: 
converted}
+                               }
+                       case TimestampType, TimestampTzType:
+                               micros := Timestamp(t.UTC().UnixMicro())
+                               if converted, ok := any(micros).(T); ok {
+                                       return Optional[T]{Valid: true, Val: 
converted}
+                               }
+                       case Int32Type:
+                               days := int32(t.Truncate(24*time.Hour).Unix() / 
int64((time.Hour * 24).Seconds()))
+                               if converted, ok := any(days).(T); ok {
+                                       return Optional[T]{Valid: true, Val: 
converted}
+                               }
+                       }
+               }
+
+               if unionMap, ok := v.(map[string]any); ok {
+                       switch b.field.Type.(type) {
+                       case DecimalType:
+                               if fixedVal, exists := unionMap["fixed"]; 
exists {
+                                       if fixedArray := 
reflect.ValueOf(fixedVal); fixedArray.Kind() == reflect.Array {
+                                               bytes := make([]byte, 
fixedArray.Len())
+                                               for i := range bytes {
+                                                       bytes[i] = 
fixedArray.Index(i).Interface().(byte)
+                                               }
+
+                                               var decimal DecimalLiteral
+                                               if err := 
decimal.UnmarshalBinary(bytes); err == nil {
+                                                       if decType, ok := 
b.field.Type.(DecimalType); ok {
+                                                               result := 
Decimal{Val: decimal.Val, Scale: decType.Scale()}
+                                                               if converted, 
ok := any(result).(T); ok {
+                                                                       return 
Optional[T]{Valid: true, Val: converted}
+                                                               }
+                                                       }
+                                               }
+                                       }
+                               }
+                       case UUIDType:
+                               if uuidVal, exists := unionMap["uuid"]; exists {
+                                       if uuidArray := 
reflect.ValueOf(uuidVal); uuidArray.Kind() == reflect.Array && uuidArray.Len() 
== 16 {
+                                               var uuidBytes [16]byte
+                                               for i := 0; i < 16; i++ {
+                                                       uuidBytes[i] = 
uuidArray.Index(i).Interface().(byte)
+                                               }
+                                               result := uuid.UUID(uuidBytes)
+                                               if converted, ok := 
any(result).(T); ok {
+                                                       return 
Optional[T]{Valid: true, Val: converted}
+                                               }
+                                       }
+                               }
+                       }
+               }

Review Comment:
   if the conversion is happening in `avroPartitionData` then why would we have 
a `map[string]any` here?



##########
exprs.go:
##########
@@ -482,8 +482,33 @@ func (b *boundRef[T]) Equals(other BoundTerm) bool {
 }
 
 func (b *boundRef[T]) Ref() BoundReference { return b }
-func (b *boundRef[T]) Field() NestedField  { return b.field }
-func (b *boundRef[T]) Type() Type          { return b.field.Type }
+
+func unwrapLogicalTypeValue(v any) any {
+       if m, ok := v.(map[string]any); ok {
+               if val, exists := m["long.timestamp-micros"]; exists {
+                       if microseconds, ok := val.(int64); ok {
+                               return Timestamp(microseconds)
+                       }
+               }
+
+               if val, exists := m["int.date"]; exists {
+                       if days, ok := val.(int32); ok {
+                               return days
+                       }
+               }
+
+               if val, exists := m["long.time-micros"]; exists {
+                       if microseconds, ok := val.(int64); ok {
+                               return Time(microseconds)
+                       }
+               }
+       }
+
+       return v

Review Comment:
   if the conversion is happening in `avroPartitionData` now, then we can 
remove this function, right?



##########
manifest.go:
##########
@@ -985,7 +1029,60 @@ func constructPartitionSummaries(spec PartitionSpec, 
schema *Schema, partitions
 
        for _, part := range partitions {
                for i, field := range partType.FieldList {
-                       fieldStats[i].update(part[field.ID])
+                       value := part[field.ID]
+
+                       if _, ok := field.Type.(FixedType); ok {
+                               if bytes := extractBytesFromFixed(value); bytes 
!= nil {
+                                       value = bytes
+                               }
+                       }

Review Comment:
   can't we handle this inside of the `partitionFieldStats` object instead of 
here?



##########
manifest.go:
##########
@@ -985,7 +1029,60 @@ func constructPartitionSummaries(spec PartitionSpec, 
schema *Schema, partitions
 
        for _, part := range partitions {
                for i, field := range partType.FieldList {
-                       fieldStats[i].update(part[field.ID])
+                       value := part[field.ID]
+
+                       if _, ok := field.Type.(FixedType); ok {
+                               if bytes := extractBytesFromFixed(value); bytes 
!= nil {
+                                       value = bytes
+                               }
+                       }
+
+                       if unionMap, ok := value.(map[string]interface{}); ok {
+                               switch field.Type.(type) {
+                               case DecimalType:
+                                       decType := field.Type.(DecimalType)
+                                       if fixedBytes, ok := unionMap["fixed"]; 
ok {
+                                               if bytes := 
extractBytesFromFixed(fixedBytes); bytes != nil {
+                                                       decLit := 
DecimalLiteral{Scale: decType.Scale()}
+                                                       if err := 
decLit.UnmarshalBinary(bytes); err == nil {
+                                                               value = 
decLit.Value()
+                                                       }
+                                               }
+                                       }
+                               case TimeType:
+                                       if longVal, ok := 
unionMap["long.time-micros"]; ok {
+                                               if microseconds, ok := 
longVal.(int64); ok {
+                                                       value = 
Time(microseconds)
+                                               }
+                                       }
+                               case TimestampType:
+                                       if longVal, ok := 
unionMap["long.timestamp-micros"]; ok {
+                                               if microseconds, ok := 
longVal.(int64); ok {
+                                                       value = 
Timestamp(microseconds)
+                                               }
+                                       }
+                               case TimestampTzType:
+                                       if longVal, ok := 
unionMap["long.timestamp-micros"]; ok {
+                                               if microseconds, ok := 
longVal.(int64); ok {
+                                                       value = 
Timestamp(microseconds)
+                                               }
+                                       }

Review Comment:
   If the conversion is happening during read and write of `avroPartitionData`, 
shouldn't we never have a map here? We should always have the properly typed 
values right?



##########
schema_conversions.go:
##########
@@ -30,33 +30,35 @@ func partitionTypeToAvroSchema(t *StructType) (avro.Schema, 
error) {
                var sc avro.Schema
                switch typ := f.Type.(type) {
                case Int32Type:
-                       sc = internal.IntSchema
+                       sc = internal.NullableSchema(internal.IntSchema)

Review Comment:
   Was a test ever added for this?



##########
table/arrow_utils.go:
##########
@@ -1222,7 +1223,23 @@ func filesToDataFiles(ctx context.Context, fileIO 
iceio.IO, meta *MetadataBuilde
                        statistics := 
format.DataFileStatsFromMeta(rdr.Metadata(), 
must(computeStatsPlan(currentSchema, meta.props)),
                                must(format.PathToIDMapping(currentSchema)))
 
-                       df := statistics.ToDataFile(currentSchema, currentSpec, 
filePath, iceberg.ParquetFile, rdr.SourceFileSize())
+                       partitionValues := make(map[int]any)
+                       if !currentSpec.Equals(*iceberg.UnpartitionedSpec) {
+                               for field := range currentSpec.Fields() {
+                                       if !field.Transform.PreservesOrder() {
+                                               yield(nil, fmt.Errorf("cannot 
infer partition value from parquet metadata for a non-linear partition field: 
%s with transform %s", field.Name, field.Transform))
+
+                                               return
+                                       }
+
+                                       partitionVal := 
statistics.PartitionValue(field, currentSchema)

Review Comment:
   this is where the conversion should happen if we need the `map` 
representation, though we could also just register the 
`Date`/`Timestamp`/`Decimal` etc. types to use the appropriate fields right? 
That would enable us to avoid having to create the map representations 
ourselves.



##########
manifest.go:
##########
@@ -1468,33 +1595,127 @@ func mapToAvroColMap[K comparable, V any](m map[K]V) 
*[]colMap[K, V] {
        return &out
 }
 
-func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType) map[int]any {
+func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType, fixedSizes map[int]int) map[int]any {
        out := make(map[int]any)
        for k, v := range input {
                if logical, ok := logicalTypes[k]; ok {
-                       switch logical {
-                       case avro.Date:
-                               out[k] = 
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
-                       case avro.TimeMillis:
-                               out[k] = Time(v.(time.Duration).Milliseconds())
-                       case avro.TimeMicros:
-                               out[k] = Time(v.(time.Duration).Microseconds())
-                       case avro.TimestampMillis:
-                               out[k] = 
Timestamp(v.(time.Time).UTC().UnixMilli())
-                       case avro.TimestampMicros:
-                               out[k] = 
Timestamp(v.(time.Time).UTC().UnixMicro())
-                       default:
-                               out[k] = v
-                       }
-
-                       continue
+                       out[k] = convertLogicalTypeValue(v, logical, 
fixedSizes[k])
+               } else {
+                       out[k] = convertDefaultValue(v, fixedSizes[k])
                }
-               out[k] = v
        }
 
        return out
 }
 
+func convertLogicalTypeValue(v any, logicalType avro.LogicalType, fixedSize 
int) any {
+       switch logicalType {
+       case avro.Date:
+               return convertDateValue(v)
+       case avro.TimeMicros:
+               return convertTimeMicrosValue(v)
+       case avro.TimestampMicros:
+               return convertTimestampMicrosValue(v)
+       case avro.Decimal:
+               return convertDecimalValue(v, fixedSize)
+       case avro.UUID:
+               return convertUUIDValue(v)
+       default:
+               return v
+       }
+}

Review Comment:
   this function should only exist for converting *from* the `map` 
representation into the correctly typed value. We should never be returning the 
map representation from here.



##########
manifest.go:
##########
@@ -1468,33 +1595,127 @@ func mapToAvroColMap[K comparable, V any](m map[K]V) 
*[]colMap[K, V] {
        return &out
 }
 
-func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType) map[int]any {
+func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType, fixedSizes map[int]int) map[int]any {

Review Comment:
   this function is only used for reading the partition data (not writing it) 
so the values in the map should be the actual typed values, not 
`map[string]any`.



##########
exprs.go:
##########
@@ -482,8 +482,33 @@ func (b *boundRef[T]) Equals(other BoundTerm) bool {
 }
 
 func (b *boundRef[T]) Ref() BoundReference { return b }
-func (b *boundRef[T]) Field() NestedField  { return b.field }
-func (b *boundRef[T]) Type() Type          { return b.field.Type }
+
+func unwrapLogicalTypeValue(v any) any {
+       if m, ok := v.(map[string]any); ok {
+               if val, exists := m["long.timestamp-micros"]; exists {
+                       if microseconds, ok := val.(int64); ok {
+                               return Timestamp(microseconds)
+                       }
+               }
+
+               if val, exists := m["int.date"]; exists {
+                       if days, ok := val.(int32); ok {
+                               return days
+                       }
+               }
+
+               if val, exists := m["long.time-micros"]; exists {
+                       if microseconds, ok := val.(int64); ok {
+                               return Time(microseconds)
+                       }
+               }
+       }
+
+       return v

Review Comment:
   the conversion should happen inside of `ToDataFile` not here. We should 
never have the `map` representation here. Specifically, it should be happening 
in `DataFileStatistics.PartitionValue`, that's where we should convert from the 
literal to the map representation. 



##########
manifest.go:
##########
@@ -1461,33 +1598,114 @@ func mapToAvroColMap[K comparable, V any](m map[K]V) 
*[]colMap[K, V] {
        return &out
 }
 
-func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType) map[int]any {
+func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType, fixedSizes map[int]int) map[int]any {
        out := make(map[int]any)
        for k, v := range input {
                if logical, ok := logicalTypes[k]; ok {
-                       switch logical {
-                       case avro.Date:
-                               out[k] = 
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
-                       case avro.TimeMillis:
-                               out[k] = Time(v.(time.Duration).Milliseconds())
-                       case avro.TimeMicros:
-                               out[k] = Time(v.(time.Duration).Microseconds())
-                       case avro.TimestampMillis:
-                               out[k] = 
Timestamp(v.(time.Time).UTC().UnixMilli())
-                       case avro.TimestampMicros:
-                               out[k] = 
Timestamp(v.(time.Time).UTC().UnixMicro())
-                       default:
-                               out[k] = v
-                       }
-
-                       continue
+                       out[k] = convertLogicalTypeValue(v, logical, 
fixedSizes[k])
+               } else {
+                       out[k] = convertDefaultValue(v, fixedSizes[k])
                }
-               out[k] = v
        }
 
        return out
 }
 
+func convertLogicalTypeValue(v any, logicalType avro.LogicalType, fixedSize 
int) any {
+       switch logicalType {
+       case avro.Date:
+               return convertDateValue(v)
+       case avro.TimeMicros:
+               return convertTimeMicrosValue(v)
+       case avro.TimestampMicros:
+               return convertTimestampMicrosValue(v)
+       case avro.Decimal:
+               return convertDecimalValue(v, fixedSize)
+       default:
+               return v
+       }
+}
+
+func convertDateValue(v any) any {
+       if t, ok := v.(time.Time); ok {
+               return map[string]any{"int.date": 
int32(t.Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds()))}
+       }
+       if d, ok := v.(Date); ok {
+               return map[string]any{"int.date": int32(d)}
+       }
+
+       return v
+}
+
+func convertTimeMicrosValue(v any) any {
+       if t, ok := v.(Time); ok {
+               return map[string]any{"long.time-micros": int64(t)}
+       }
+       if d, ok := v.(time.Duration); ok {
+               return map[string]any{"long.time-micros": d.Microseconds()}
+       }
+
+       return v
+}
+
+func convertTimestampMicrosValue(v any) any {
+       if t, ok := v.(time.Time); ok {
+               return map[string]any{"long.timestamp-micros": 
t.UTC().UnixMicro()}
+       }
+       if ts, ok := v.(Timestamp); ok {
+               return map[string]any{"long.timestamp-micros": int64(ts)}
+       }
+
+       return v
+}
+
+func convertDecimalValue(v any, fixedSize int) any {
+       if v == nil {
+               return map[string]any{"null": nil}
+       }
+
+       dec, ok := v.(Decimal)
+       if !ok {
+               return v
+       }
+
+       bytes, err := DecimalLiteral(dec).MarshalBinary()
+       if err != nil {
+               return v
+       }
+       fixedArray := convertToFixedArray(padOrTruncateBytes(bytes, fixedSize), 
fixedSize)

Review Comment:
   Then that should be happening inside of `DataFileStatistics.PartitionValue` 
or in `DataFileStatistics.ToDataFile`. That's where we perform the writes. This 
code path only happens on read, so I don't know why we're doing this here.



##########
manifest.go:
##########
@@ -1461,33 +1520,116 @@ func mapToAvroColMap[K comparable, V any](m map[K]V) 
*[]colMap[K, V] {
        return &out
 }
 
-func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType) map[int]any {
+func avroPartitionData(input map[int]any, logicalTypes 
map[int]avro.LogicalType, fixedSizes map[int]int) map[int]any {
        out := make(map[int]any)
        for k, v := range input {
                if logical, ok := logicalTypes[k]; ok {
-                       switch logical {
-                       case avro.Date:
-                               out[k] = 
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
-                       case avro.TimeMillis:
-                               out[k] = Time(v.(time.Duration).Milliseconds())
-                       case avro.TimeMicros:
-                               out[k] = Time(v.(time.Duration).Microseconds())
-                       case avro.TimestampMillis:
-                               out[k] = 
Timestamp(v.(time.Time).UTC().UnixMilli())
-                       case avro.TimestampMicros:
-                               out[k] = 
Timestamp(v.(time.Time).UTC().UnixMicro())
-                       default:
-                               out[k] = v
-                       }
-
-                       continue
+                       out[k] = convertLogicalTypeValue(v, logical, 
fixedSizes[k])
+               } else {
+                       out[k] = convertDefaultValue(v, fixedSizes[k])
                }
-               out[k] = v
        }
 
        return out
 }
 
+func convertLogicalTypeValue(v any, logicalType avro.LogicalType, fixedSize 
int) any {
+       switch logicalType {
+       case avro.Date:
+               return convertDateValue(v)
+       case avro.TimeMicros:
+               return convertTimeMicrosValue(v)
+       case avro.TimestampMicros:
+               return convertTimestampMicrosValue(v)
+       case avro.Decimal:
+               return convertDecimalValue(v, fixedSize)
+       default:
+               return v
+       }
+}
+
+func convertDateValue(v any) any {
+       if t, ok := v.(time.Time); ok {
+               return map[string]any{"int.date": 
int32(t.Truncate(24*time.Hour).Unix() / int64((time.Hour * 24).Seconds()))}
+       }
+       if d, ok := v.(Date); ok {
+               return map[string]any{"int.date": int32(d)}
+       }
+
+       return v
+}
+
+func convertTimeMicrosValue(v any) any {
+       if t, ok := v.(Time); ok {
+               return map[string]any{"long.time-micros": int64(t)}
+       }
+       if d, ok := v.(time.Duration); ok {
+               return map[string]any{"long.time-micros": d.Microseconds()}
+       }
+
+       return v
+}
+
+func convertTimestampMicrosValue(v any) any {
+       if t, ok := v.(time.Time); ok {
+               return map[string]any{"long.timestamp-micros": 
t.UTC().UnixMicro()}
+       }
+       if ts, ok := v.(Timestamp); ok {
+               return map[string]any{"long.timestamp-micros": int64(ts)}
+       }
+
+       return v
+}
+
+func convertDecimalValue(v any, fixedSize int) any {
+       dec, ok := v.(Decimal)
+       if !ok {
+               return v
+       }
+
+       bytes, err := DecimalLiteral(dec).MarshalBinary()
+       if err != nil {
+               return v
+       }
+
+       byteSize := 5
+       if fixedSize > 0 {
+               byteSize = fixedSize
+       }
+
+       return convertToFixedArray(padOrTruncateBytes(bytes, byteSize), 
byteSize)
+}
+
+func convertDefaultValue(v any, fixedSize int) any {
+       if uuidVal, ok := v.(uuid.UUID); ok {
+               return uuidVal.String()
+       }
+
+       if bytes, ok := v.([]byte); ok && fixedSize > 0 {
+               return convertToFixedArray(padOrTruncateBytes(bytes, 
fixedSize), fixedSize)
+       }
+
+       return v
+}
+
+func padOrTruncateBytes(bytes []byte, size int) []byte {
+       fixedBytes := make([]byte, size)
+       if len(bytes) <= size {
+               copy(fixedBytes[size-len(bytes):], bytes)
+       } else {
+               copy(fixedBytes[:], bytes[len(bytes)-size:])
+       }
+
+       return fixedBytes
+}
+
+func convertToFixedArray(bytes []byte, size int) any {
+       arr := reflect.New(reflect.ArrayOf(size, 
reflect.TypeOf(byte(0)))).Elem()
+       reflect.Copy(arr, reflect.ValueOf(bytes))

Review Comment:
   as with the other sections, I'm still unsure here. This codepath seems to 
*only* be used during read, so we should never have to produce the `map` 
representation here. The avro library itself should perform the conversions for 
us when dealing with fixed size array things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to