xixipi-lining commented on code in PR #431:
URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2238276172


##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "strings"
+
+       "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+       txn          *Transaction
+       schema       *iceberg.Schema
+       lastColumnID int
+
+       operations []SchemaOperation
+       errors     []error // Collect validation errors
+
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+       Apply(builder *schemaBuilder) error
+       Validate(schema *iceberg.Schema, settings *validationSettings) error
+       String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+       allowIncompatibleChanges bool
+       caseSensitive            bool
+}
+
+type schemaBuilder struct {
+       deletes      map[int]struct{}
+       updates      map[int]*iceberg.NestedField
+       adds         map[int][]*iceberg.NestedField
+       moves        map[int][]moveReq
+       lastColumnID int
+       baseSchema   *iceberg.Schema
+       settings     *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the 
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the 
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+       path           []string
+       required       bool
+       dataType       iceberg.Type
+       doc            string
+       initialDefault any
+}
+
+type updateColumnOp struct {
+       path    []string
+       updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+       path []string
+}
+
+type moveColumnOp struct {
+       columnToMove    []string
+       referenceColumn []string
+       op              moveOp
+}
+
+type ColumnUpdate struct {
+       Type     iceberg.Optional[iceberg.Type] // nil means no change
+       Doc      iceberg.Optional[string]       // nil means no change
+       Default  any                            // nil means no change
+       Required iceberg.Optional[bool]         // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+       OpFirst  moveOp = "first"
+       OpBefore moveOp = "before"
+       OpAfter  moveOp = "after"
+)
+
+type moveReq struct {
+       fieldID      int
+       otherFieldID int // 0 when opFirst
+       op           moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int) 
*UpdateSchema {
+       return &UpdateSchema{
+               txn:                      txn,
+               schema:                   s,
+               lastColumnID:             lastColumnID,
+               operations:               make([]SchemaOperation, 0),
+               errors:                   make([]error, 0),
+               allowIncompatibleChanges: false,
+               identifierFields:         make(map[string]struct{}),
+               caseSensitive:            true,
+       }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+       us.allowIncompatibleChanges = true
+       return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+       us.caseSensitive = caseSensitive
+       return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build 
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType 
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+       op := &addColumnOp{
+               path:           path,
+               required:       required,
+               dataType:       dataType,
+               doc:            doc,
+               initialDefault: initialDefault,
+       }
+       us.operations = append(us.operations, op)
+       return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until 
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate) 
*UpdateSchema {
+       op := &updateColumnOp{
+               path:    path,
+               updates: updates,
+       }
+       us.operations = append(us.operations, op)
+       return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until 
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+       op := &deleteColumnOp{
+               path: path,
+       }
+       us.operations = append(us.operations, op)
+       return us
+}
+
+// Move queues a move column operation - validation deferred until Build is 
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op 
moveOp) *UpdateSchema {
+       moveOp := &moveColumnOp{
+               columnToMove:    columnToMove,
+               referenceColumn: referenceColumn,
+               op:              op,
+       }
+       us.operations = append(us.operations, moveOp)
+       return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until 
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+       us.operations = us.operations[:0]
+       us.errors = us.errors[:0]
+       return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+       if len(us.operations) > 0 {
+               us.operations = us.operations[:len(us.operations)-1]
+       }
+       return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+       result := make([]string, len(us.operations))
+       for i, op := range us.operations {
+               result[i] = op.String()
+       }
+       return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+       // Clear the previous errors
+       // because if validation is called multiple times, the errors will 
accumulate even after removing operations causing error
+       us.errors = us.errors[:0]
+
+       settings := &validationSettings{
+               allowIncompatibleChanges: us.allowIncompatibleChanges,
+               caseSensitive:            us.caseSensitive,
+       }
+
+       for _, op := range us.operations {
+               if err := op.Validate(us.schema, settings); err != nil {
+                       us.errors = append(us.errors, err)
+               }
+       }
+
+       if len(us.errors) > 0 {
+               return fmt.Errorf("validation failed with %d errors: %v", 
len(us.errors), us.errors)
+       }
+       return nil
+}
+
+// Build validates and constructs the final schema
+func (us *UpdateSchema) Build() (*iceberg.Schema, error) {
+       // First validate all operations
+       if err := us.Validate(); err != nil {
+               return nil, err
+       }
+
+       // No operations means no changes
+       if len(us.operations) == 0 {
+               return us.schema, nil
+       }
+
+       // Create builder and apply operations
+       builder := &schemaBuilder{
+               deletes:      make(map[int]struct{}),
+               updates:      make(map[int]*iceberg.NestedField),
+               adds:         make(map[int][]*iceberg.NestedField),
+               moves:        make(map[int][]moveReq),
+               lastColumnID: us.lastColumnID,
+               baseSchema:   us.schema,
+               settings: &validationSettings{
+                       allowIncompatibleChanges: us.allowIncompatibleChanges,
+                       caseSensitive:            us.caseSensitive,
+               },
+       }
+
+       // Apply operations in order
+       for _, op := range us.operations {
+               if err := op.Apply(builder); err != nil {
+                       return nil, fmt.Errorf("failed to apply operation %s: 
%w", op.String(), err)
+               }
+       }
+
+       // Build final schema
+       return us.buildFinalSchema(builder)
+}
+
+// Commit validates, builds, and commits the schema changes
+func (us *UpdateSchema) Commit() error {
+       updates, requirements, err := us.CommitUpdates()
+       if err != nil {
+               return err
+       }
+
+       if len(updates) == 0 {
+               return nil
+       }
+
+       return us.txn.apply(updates, requirements)
+}
+
+// CommitUpdates returns the updates and requirements needed
+func (us *UpdateSchema) CommitUpdates() ([]Update, []Requirement, error) {
+       // If there are no operations, return nil updates and requirements
+       if len(us.operations) == 0 {
+               return nil, nil, nil
+       }
+
+       newSchema, err := us.Build()
+       if err != nil {
+               return nil, nil, err
+       }
+
+       // Check if equivalent schema exists
+       existingSchemaID := us.findExistingSchemaInTransaction(newSchema)
+
+       var updates []Update
+       var requirements []Requirement
+
+       // for Commit Contention
+       requirements = append(requirements, AssertCurrentSchemaID(us.schema.ID))
+
+       if existingSchemaID == nil {
+               // Get the final lastColumnID from the builder
+               builder := &schemaBuilder{
+                       deletes:      make(map[int]struct{}),
+                       updates:      make(map[int]*iceberg.NestedField),
+                       adds:         make(map[int][]*iceberg.NestedField),
+                       moves:        make(map[int][]moveReq),
+                       lastColumnID: us.lastColumnID,
+                       baseSchema:   us.schema,
+                       settings: &validationSettings{
+                               allowIncompatibleChanges: 
us.allowIncompatibleChanges,
+                               caseSensitive:            us.caseSensitive,
+                       },
+               }
+
+               for _, op := range us.operations {
+                       if err := op.Apply(builder); err != nil {
+                               return nil, nil, fmt.Errorf("failed to 
recompute lastColumnID: %w", err)
+                       }
+               }
+               updates = append(updates,
+                       NewAddSchemaUpdate(newSchema, builder.lastColumnID, 
false),
+                       NewSetCurrentSchemaUpdate(newSchema.ID),
+               )
+       } else {
+               updates = append(updates, 
NewSetCurrentSchemaUpdate(*existingSchemaID))
+       }
+
+       if nameMapUpdates := us.getNameMappingUpdates(newSchema); 
len(nameMapUpdates) > 0 {
+               updates = append(updates, nameMapUpdates...)
+       }
+
+       return updates, requirements, nil
+}
+
+/// AddColumn Operation
+
+// Implementation of Operation interface for addColumnOp
+func (op *addColumnOp) Validate(schema *iceberg.Schema, settings 
*validationSettings) error {
+       if len(op.path) == 0 {
+               return errors.New("AddColumn: path must contain at least the 
new column name")
+       }
+
+       // Check if field already exists
+       if existing := findField(schema, op.path, settings.caseSensitive); 
existing != nil {
+               return fmt.Errorf("cannot add column; name already exists: %s", 
strings.Join(op.path, "."))
+       }
+
+       // Validate parent exists and is a nested type
+       if len(op.path) > 1 {
+               parentPath := op.path[:len(op.path)-1]
+               pf := findField(schema, parentPath, settings.caseSensitive)
+               if pf == nil {
+                       return fmt.Errorf("cannot find parent struct: %s", 
strings.Join(parentPath, "."))
+               }
+
+               switch pf.Type.(type) {
+               case *iceberg.StructType, *iceberg.MapType, *iceberg.ListType:

Review Comment:
   If the parent field found in that way is a MapType, we need to check whether 
its value type is a structure. Likewise, for ListType, we need to check if the 
element type is a structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to