nandorKollar commented on code in PR #431:
URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2133671891


##########
table/update_schema.go:
##########
@@ -0,0 +1,532 @@
+package table
+
+import (
+       "github.com/apache/iceberg-go"
+)
+
+type UpdateSchema struct {
+       base                     *Metadata
+       schema                   *iceberg.Schema
+       idToParent               map[int]int
+       deletes                  []int
+       updates                  map[int]*iceberg.NestedField
+       parentToAddedIDs         map[int][]int
+       addedNameToID            map[string]int
+       lastColumnID             int
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+func NewUpdateSchema(base *Metadata, s *iceberg.Schema, lastColumnID int) 
*UpdateSchema {
+       identifierFields := make(map[string]struct{})
+
+       return &UpdateSchema{
+               base:                     base,
+               schema:                   s,
+               idToParent:               make(map[int]int),
+               deletes:                  make([]int, 0),
+               updates:                  make(map[int]*iceberg.NestedField),
+               parentToAddedIDs:         make(map[int][]int),
+               addedNameToID:            make(map[string]int),
+               lastColumnID:             lastColumnID,
+               allowIncompatibleChanges: false,
+               identifierFields:         identifierFields,
+               caseSensitive:            true,
+       }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes.
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+       us.allowIncompatibleChanges = true
+
+       return us
+}
+
+func (us *UpdateSchema) AddColumn(parent, name string, new_id int, required 
bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema 
{
+       parentID := -1
+       fullName := ""
+
+       if parent != "" {
+               parentField := us.findField(parent)
+               if parentField == nil {
+                       panic("Cannot find parent struct: " + parent)
+               }
+
+               // Get the parent struct's ID
+               parentID = parentField.ID
+
+               // Store the parent-child relationship
+               us.idToParent[new_id] = parentID // Add this line
+               if _, ok := us.parentToAddedIDs[parentID]; !ok {
+                       us.parentToAddedIDs[parentID] = make([]int, 0)
+               }
+               us.parentToAddedIDs[parentID] = 
append(us.parentToAddedIDs[parentID], new_id)
+
+               // Use full qualified name
+               fullName = parent + "." + name
+               us.addedNameToID[fullName] = new_id
+
+               parentType := parentField.Type
+
+               // ToDo: handle Nested columns
+               if nestedType, ok := parentType.(iceberg.NestedType); ok {
+                       if mapType, ok := nestedType.(*iceberg.MapType); ok {
+                               // fields are added to the map value type
+                               vf := mapType.ValueField()
+                               parentField = &vf
+                       } else if listType, ok := 
nestedType.(*iceberg.ListType); ok {
+                               // fields are added to the element type
+                               ef := listType.ElementField()
+                               parentField = &ef
+                       } else if structType, ok := 
nestedType.(*iceberg.StructType); ok {
+                               // fields are added to the struct
+                               fields := structType.Fields()
+                               parentField = &fields[len(fields)-1]
+                       }
+               } else {
+                       panic("Cannot add to non-nested DataType: " + parent + 
": " + parentField.Type.String())
+               }
+
+               parentID = parentField.ID
+               currentField := us.findField(parent + "." + name)
+
+               for _, id := range us.deletes {
+                       if id == parentID {
+                               panic("Cannot add to a column that will be 
deleted: " + parent)
+                       }
+               }
+
+               if currentField != nil {
+                       foundInDeletes := false
+                       for _, id := range us.deletes {
+                               if id == currentField.ID {
+                                       foundInDeletes = true
+
+                                       break
+                               }
+                       }
+                       if !foundInDeletes {
+                               panic("Cannot add column, name already exists: 
" + parent + "." + name)
+                       }
+               }
+               _, present := us.schema.FindColumnName(parentID)
+               if present {
+                       fullName = parent + "." + name
+               } else {
+                       fullName = name
+               }
+
+               nestedField := &iceberg.NestedField{
+                       Name:     name,
+                       ID:       new_id,
+                       Required: required,
+                       Type:     dataType,
+                       Doc:      doc,
+               }
+
+               us.updates[new_id] = nestedField
+               us.parentToAddedIDs[parentID] = 
append(us.parentToAddedIDs[parentID], new_id)
+               us.addedNameToID[fullName] = new_id
+
+       } else {
+
+               field := us.findField(name)
+               if field != nil {
+                       foundInDeletes := false
+                       for _, delete := range us.deletes {
+                               if delete == field.ID {
+                                       foundInDeletes = true
+                               }
+                       }
+                       if !foundInDeletes {
+                               panic("Cannot add column, name already exists: 
" + name)
+                       }
+
+               }
+
+               // cannot add a required column without a default value
+               if initialDefaultValue == nil && !required && 
!us.allowIncompatibleChanges {
+                       panic("incompatible change: cannot add required column 
without a default value: " + name)
+               }
+
+               nestedField := &iceberg.NestedField{
+                       Name:     name,
+                       ID:       new_id,
+                       Required: required,
+                       Type:     dataType,
+                       Doc:      doc,
+               }
+
+               if initialDefaultValue != nil {
+                       nestedField.InitialDefault = initialDefaultValue
+               }
+
+               us.updates[new_id] = nestedField
+               us.parentToAddedIDs[-1] = append(us.parentToAddedIDs[-1], 
new_id)
+               us.addedNameToID[name] = new_id
+       }
+
+       return us
+}
+
+func (su *UpdateSchema) internalUpdateColumnRequirement(name string, 
isRequired bool) {
+       field := su.findForUpdate(name)
+       if field == nil {
+               panic("Cannot update missing column: " + name)
+       }
+
+       if (!isRequired && !field.Required) || (isRequired && field.Required) {
+               // if the change is a noop, allow it even if 
allowIncompatibleChanges is false
+               return
+       }
+
+       isDefaultedAdd := su.isAdded(name) && field.InitialDefault != nil
+
+       if !isRequired && !isDefaultedAdd && !su.allowIncompatibleChanges {
+               panic("Cannot change column nullability: " + name + ": optional 
-> required")
+       }
+
+       for _, id := range su.deletes {
+               if id == field.ID {
+                       panic("Cannot update a column that will be deleted: " + 
field.Name)
+               }
+       }
+
+       field.Required = isRequired
+
+       su.updates[field.ID] = field
+}
+
+func (us *UpdateSchema) RenameColumn(fromPath, newName string) *UpdateSchema {
+       field := us.findField(fromPath)
+       if field == nil {
+               panic("Cannot rename missing column: " + fromPath)
+       }
+       // block rename if the column is being deleted
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic("Cannot rename a column that will be deleted: " + 
fromPath)
+               }
+       }
+
+       // clone or create update entry
+       upd, ok := us.updates[field.ID]
+       if !ok {
+               c := *field // copy
+               upd = &c
+       }
+
+       upd.Name = newName
+       us.updates[field.ID] = upd
+
+       // identifier set book-keeping
+       if _, ok := us.identifierFields[fromPath]; ok {
+               delete(us.identifierFields, fromPath)
+               us.identifierFields[newName] = struct{}{}
+       }
+       return us
+}
+
+// DeleteColumn removes a column from the schema.
+func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema {
+       field := us.findField(name)
+       if field == nil {
+               panic("Cannot delete missing column: " + name)
+       }
+
+       if _, ok := us.parentToAddedIDs[field.ID]; ok {
+               panic("Cannot delete a column that has additions: " + name)
+       }
+
+       if _, ok := us.updates[field.ID]; ok {
+               panic("Cannot delete a column that has updates: " + name)
+       }
+
+       us.deletes = append(us.deletes, field.ID)
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic("Cannot update type of missing column: " + name)
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic("Cannot update a column that will be deleted: " + 
field.Name)
+               }
+       }
+
+       if field.Type.Equals(newType) {
+               return us
+       }
+
+       // check promotion
+       if !allowedPromotion(field.Type, newType) {
+               panic("Cannot update type of column: " + field.Name + ": " + 
newType.String())
+       }
+
+       us.updates[field.ID].Type = newType
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema 
{
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic("Cannot update type of missing column: " + name)
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic("Cannot update a column that will be deleted: " + 
field.Name)
+               }
+       }
+
+       if field.Doc == doc {
+               return us
+       }
+
+       us.updates[field.ID].Doc = doc
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic("Cannot update default value of missing column: " + name)
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic("Cannot update a column that will be deleted: " + 
field.Name)
+               }
+       }
+
+       if field.InitialDefault == defaultValue {
+               return us
+       }
+
+       us.updates[field.ID].InitialDefault = defaultValue
+
+       return us
+}
+
+// RequireColumn changes an optional column to required.
+func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, true)
+
+       return us
+}
+
+// MakeColumnOptional changes a required column to optional.
+func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, false)
+
+       return us
+}
+
+func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField {
+       existing := us.findField(name)
+       if existing != nil {
+               if update, ok := us.updates[existing.ID]; ok {
+                       return update
+               }
+
+               // adding to updates
+               us.updates[existing.ID] = existing
+
+               return existing
+       }
+
+       addedID, ok := us.addedNameToID[name]
+       if ok {
+               return us.updates[addedID]
+       }
+
+       return nil
+}
+
+func (us *UpdateSchema) Apply() *iceberg.Schema {
+       return us.applyChanges()
+}
+
+func (su *UpdateSchema) isAdded(name string) bool {
+       _, ok := su.addedNameToID[name]
+
+       return ok
+}
+
+func (us *UpdateSchema) assignNewColumnID() int {
+       next := us.lastColumnID + 1
+       us.lastColumnID = next
+
+       return next
+}
+
+func (us *UpdateSchema) findField(name string) *iceberg.NestedField {
+       if us.caseSensitive {
+               field, ok := us.schema.FindFieldByName(name)
+               if !ok {
+                       return nil
+               }
+
+               return &field
+       }
+       field, ok := us.schema.FindFieldByNameCaseInsensitive(name)
+       if !ok {
+               return nil
+       }
+
+       return &field
+}
+
+func allowedPromotion(oldType, newType iceberg.Type) bool {
+       switch old := oldType.(type) {
+       case iceberg.PrimitiveType:
+               switch old.Type() {
+               case "int":
+                       return newType.Type() == "long"
+               case "float":
+                       return newType.Type() == "double"
+               case "string":
+                       return newType.Type() == "binary" // widening
+               case "fixed":
+                       return newType.Type() == "binary"
+               case "time":

Review Comment:
   Is time -> timestamp promotion allowed? From spec, it appears to me that 
only date -> timestamp promotion is allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to