zeroshade commented on code in PR #1112:
URL: https://github.com/apache/iceberg-go/pull/1112#discussion_r3290531123


##########
table/partitioned_fanout_writer.go:
##########
@@ -51,6 +52,12 @@ type partitionInfo struct {
        partitionRec    partitionRecord // The actual partition values for 
generating the path
 }
 
+type partitionFieldInfo struct {
+       sourceField *iceberg.PartitionField

Review Comment:
   PartitionField is a small struct, why use a pointer here instead of just 
using it by value?



##########
table/partitioned_fanout_writer.go:
##########
@@ -464,3 +472,75 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, row 
int) (iceberg.Literal
                return nil, fmt.Errorf("unsupported value type: %T", val)
        }
 }
+
+func timestampLiteralFromArrow(arr *array.Timestamp, row int, sourceType 
iceberg.Type) (iceberg.Literal, error) {
+       timestampType := arr.DataType().(*arrow.TimestampType)
+       value := int64(arr.Value(row))
+
+       switch sourceType.(type) {
+       case iceberg.TimestampType, iceberg.TimestampTzType:
+               micros, err := arrowTimestampToMicros(value, timestampType.Unit)
+               if err != nil {
+                       return nil, err
+               }
+
+               return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil
+       case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
+               nanos, err := arrowTimestampToNanos(value, timestampType.Unit)
+               if err != nil {
+                       return nil, err
+               }
+
+               return iceberg.NewLiteral(iceberg.TimestampNano(nanos)), nil
+       default:
+               return nil, fmt.Errorf("cannot convert arrow timestamp to 
iceberg literal for source type %v", sourceType)
+       }
+}
+
+func arrowTimestampToMicros(value int64, unit arrow.TimeUnit) (int64, error) {
+       switch unit {
+       case arrow.Second:
+               return scaleTimestamp(value, 1_000_000)
+       case arrow.Millisecond:
+               return scaleTimestamp(value, 1_000)
+       case arrow.Microsecond:
+               return value, nil
+       case arrow.Nanosecond:
+               return floorDivInt64(value, 1_000), nil
+       default:
+               return 0, fmt.Errorf("unsupported arrow timestamp unit: %s", 
unit)
+       }
+}
+
+func arrowTimestampToNanos(value int64, unit arrow.TimeUnit) (int64, error) {
+       switch unit {
+       case arrow.Second:
+               return scaleTimestamp(value, 1_000_000_000)
+       case arrow.Millisecond:
+               return scaleTimestamp(value, 1_000_000)
+       case arrow.Microsecond:
+               return scaleTimestamp(value, 1_000)
+       case arrow.Nanosecond:
+               return value, nil
+       default:
+               return 0, fmt.Errorf("unsupported arrow timestamp unit: %s", 
unit)
+       }
+}
+
+func scaleTimestamp(value, factor int64) (int64, error) {
+       if (value > 0 && value > math.MaxInt64/factor) ||
+               (value < 0 && value < math.MinInt64/factor) {
+               return 0, fmt.Errorf("arrow timestamp value %d overflows int64 
when scaled by %d", value, factor)
+       }
+
+       return value * factor, nil
+}
+
+func floorDivInt64(a, b int64) int64 {
+       d := a / b
+       if (a^b) < 0 && d*b != a {
+               d--
+       }
+
+       return d
+}

Review Comment:
   this already exists in the root transforms.go file, we should probably just 
move the version in transforms.go:579 into an internal/utils.go file and then 
use that in both places rather than duplicate this function.



##########
table/partitioned_fanout_writer.go:
##########
@@ -214,28 +221,33 @@ func getRecordPartitions(spec iceberg.PartitionSpec, 
schema *iceberg.Schema, rec
        partitionRec := make(partitionRecord, len(partitionFields))
 
        partitionColumns := make([]arrow.Array, len(partitionFields))
-       partitionFieldsInfo := make([]struct {
-               sourceField *iceberg.PartitionField
-               fieldID     int
-       }, len(partitionFields))
+       partitionFieldsInfo := make([]partitionFieldInfo, len(partitionFields))
 
        for i := range partitionFields {
                sourceField := spec.Field(i)
-               colName, _ := schema.FindColumnName(sourceField.SourceID())
-               colIdx := record.Schema().FieldIndices(colName)[0]
-               partitionColumns[i] = record.Column(colIdx)
-               partitionFieldsInfo[i] = struct {
-                       sourceField *iceberg.PartitionField
-                       fieldID     int
-               }{&sourceField, sourceField.FieldID}
+               colName, ok := schema.FindColumnName(sourceField.SourceID())
+               if !ok {
+                       return nil, fmt.Errorf("failed to find source field ID 
%d in schema", sourceField.SourceID())
+               }
+               colIndices := record.Schema().FieldIndices(colName)
+               if len(colIndices) == 0 {
+                       return nil, fmt.Errorf("failed to find source column %q 
in record schema", colName)
+               }
+               sourceType, ok := schema.FindTypeByID(sourceField.SourceID())
+               if !ok {
+                       return nil, fmt.Errorf("failed to find source field ID 
%d in schema", sourceField.SourceID())
+               }
+               partitionColumns[i] = record.Column(colIndices[0])
+               partitionFieldsInfo[i] = partitionFieldInfo{&sourceField, 
sourceField.FieldID, sourceType}

Review Comment:
   ```suggestion
                partitionFieldsInfo[i] = partitionFieldInfo{
                    sourceField: &sourceField, 
                    fieldID:          sourceField.FieldID, 
                    sourceType: sourceType,
                }
   ```
   
   just so we don't accidentally misorder things



##########
table/partitioned_fanout_writer.go:
##########
@@ -464,3 +472,75 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, row 
int) (iceberg.Literal
                return nil, fmt.Errorf("unsupported value type: %T", val)
        }
 }
+
+func timestampLiteralFromArrow(arr *array.Timestamp, row int, sourceType 
iceberg.Type) (iceberg.Literal, error) {
+       timestampType := arr.DataType().(*arrow.TimestampType)
+       value := int64(arr.Value(row))
+
+       switch sourceType.(type) {
+       case iceberg.TimestampType, iceberg.TimestampTzType:
+               micros, err := arrowTimestampToMicros(value, timestampType.Unit)
+               if err != nil {
+                       return nil, err
+               }
+
+               return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil
+       case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
+               nanos, err := arrowTimestampToNanos(value, timestampType.Unit)
+               if err != nil {
+                       return nil, err
+               }
+
+               return iceberg.NewLiteral(iceberg.TimestampNano(nanos)), nil
+       default:
+               return nil, fmt.Errorf("cannot convert arrow timestamp to 
iceberg literal for source type %v", sourceType)
+       }
+}
+
+func arrowTimestampToMicros(value int64, unit arrow.TimeUnit) (int64, error) {
+       switch unit {
+       case arrow.Second:
+               return scaleTimestamp(value, 1_000_000)
+       case arrow.Millisecond:
+               return scaleTimestamp(value, 1_000)
+       case arrow.Microsecond:
+               return value, nil
+       case arrow.Nanosecond:
+               return floorDivInt64(value, 1_000), nil
+       default:
+               return 0, fmt.Errorf("unsupported arrow timestamp unit: %s", 
unit)
+       }
+}
+
+func arrowTimestampToNanos(value int64, unit arrow.TimeUnit) (int64, error) {
+       switch unit {
+       case arrow.Second:
+               return scaleTimestamp(value, 1_000_000_000)
+       case arrow.Millisecond:
+               return scaleTimestamp(value, 1_000_000)
+       case arrow.Microsecond:
+               return scaleTimestamp(value, 1_000)
+       case arrow.Nanosecond:
+               return value, nil
+       default:
+               return 0, fmt.Errorf("unsupported arrow timestamp unit: %s", 
unit)
+       }
+}
+
+func scaleTimestamp(value, factor int64) (int64, error) {
+       if (value > 0 && value > math.MaxInt64/factor) ||
+               (value < 0 && value < math.MinInt64/factor) {
+               return 0, fmt.Errorf("arrow timestamp value %d overflows int64 
when scaled by %d", value, factor)
+       }

Review Comment:
   can you add a test that covers this? I don't think it's covered by the 
current tests



##########
table/partitioned_fanout_writer.go:
##########
@@ -464,3 +472,75 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, row 
int) (iceberg.Literal
                return nil, fmt.Errorf("unsupported value type: %T", val)
        }
 }
+
+func timestampLiteralFromArrow(arr *array.Timestamp, row int, sourceType 
iceberg.Type) (iceberg.Literal, error) {
+       timestampType := arr.DataType().(*arrow.TimestampType)
+       value := int64(arr.Value(row))
+
+       switch sourceType.(type) {
+       case iceberg.TimestampType, iceberg.TimestampTzType:
+               micros, err := arrowTimestampToMicros(value, timestampType.Unit)
+               if err != nil {
+                       return nil, err
+               }
+
+               return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil
+       case iceberg.TimestampNsType, iceberg.TimestampTzNsType:

Review Comment:
   the Tz variants don't seem to get tested, can you add cases that have 
`TimeZone: "UTC"` so we hit this case?



##########
table/partitioned_fanout_writer.go:
##########
@@ -214,28 +221,33 @@ func getRecordPartitions(spec iceberg.PartitionSpec, 
schema *iceberg.Schema, rec
        partitionRec := make(partitionRecord, len(partitionFields))
 
        partitionColumns := make([]arrow.Array, len(partitionFields))
-       partitionFieldsInfo := make([]struct {
-               sourceField *iceberg.PartitionField
-               fieldID     int
-       }, len(partitionFields))
+       partitionFieldsInfo := make([]partitionFieldInfo, len(partitionFields))
 
        for i := range partitionFields {
                sourceField := spec.Field(i)
-               colName, _ := schema.FindColumnName(sourceField.SourceID())
-               colIdx := record.Schema().FieldIndices(colName)[0]
-               partitionColumns[i] = record.Column(colIdx)
-               partitionFieldsInfo[i] = struct {
-                       sourceField *iceberg.PartitionField
-                       fieldID     int
-               }{&sourceField, sourceField.FieldID}
+               colName, ok := schema.FindColumnName(sourceField.SourceID())
+               if !ok {
+                       return nil, fmt.Errorf("failed to find source field ID 
%d in schema", sourceField.SourceID())
+               }
+               colIndices := record.Schema().FieldIndices(colName)
+               if len(colIndices) == 0 {
+                       return nil, fmt.Errorf("failed to find source column %q 
in record schema", colName)
+               }
+               sourceType, ok := schema.FindTypeByID(sourceField.SourceID())
+               if !ok {
+                       return nil, fmt.Errorf("failed to find source field ID 
%d in schema", sourceField.SourceID())

Review Comment:
   can we use something like "failed to find type for source field ID" to 
distinguish this error from the above identical one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to