Shreyas220 commented on code in PR #431:
URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2134003289


##########
table/update_schema.go:
##########
@@ -0,0 +1,532 @@
+package table
+
+import (
+       "github.com/apache/iceberg-go"
+)
+
+type UpdateSchema struct {
+       base                     *Metadata
+       schema                   *iceberg.Schema
+       idToParent               map[int]int
+       deletes                  []int
+       updates                  map[int]*iceberg.NestedField
+       parentToAddedIDs         map[int][]int
+       addedNameToID            map[string]int
+       lastColumnID             int
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+func NewUpdateSchema(base *Metadata, s *iceberg.Schema, lastColumnID int) 
*UpdateSchema {
+       identifierFields := make(map[string]struct{})
+
+       return &UpdateSchema{
+               base:                     base,
+               schema:                   s,
+               idToParent:               make(map[int]int),
+               deletes:                  make([]int, 0),
+               updates:                  make(map[int]*iceberg.NestedField),
+               parentToAddedIDs:         make(map[int][]int),
+               addedNameToID:            make(map[string]int),
+               lastColumnID:             lastColumnID,
+               allowIncompatibleChanges: false,
+               identifierFields:         identifierFields,
+               caseSensitive:            true,
+       }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes.
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+       us.allowIncompatibleChanges = true
+
+       return us
+}
+
+func (us *UpdateSchema) AddColumn(parent, name string, new_id int, required 
bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema 
{
+       parentID := -1
+       fullName := ""
+
+       if parent != "" {
+               parentField := us.findField(parent)
+               if parentField == nil {
+                       panic("Cannot find parent struct: " + parent)
+               }
+
+               // Get the parent struct's ID
+               parentID = parentField.ID
+
+               // Store the parent-child relationship
+               us.idToParent[new_id] = parentID // Add this line
+               if _, ok := us.parentToAddedIDs[parentID]; !ok {
+                       us.parentToAddedIDs[parentID] = make([]int, 0)
+               }
+               us.parentToAddedIDs[parentID] = 
append(us.parentToAddedIDs[parentID], new_id)
+
+               // Use full qualified name
+               fullName = parent + "." + name
+               us.addedNameToID[fullName] = new_id
+
+               parentType := parentField.Type
+
+               // ToDo: handle Nested columns
+               if nestedType, ok := parentType.(iceberg.NestedType); ok {
+                       if mapType, ok := nestedType.(*iceberg.MapType); ok {
+                               // fields are added to the map value type
+                               vf := mapType.ValueField()
+                               parentField = &vf
+                       } else if listType, ok := 
nestedType.(*iceberg.ListType); ok {
+                               // fields are added to the element type
+                               ef := listType.ElementField()
+                               parentField = &ef
+                       } else if structType, ok := 
nestedType.(*iceberg.StructType); ok {
+                               // fields are added to the struct
+                               fields := structType.Fields()
+                               parentField = &fields[len(fields)-1]
+                       }
+               } else {
+                       panic("Cannot add to non-nested DataType: " + parent + 
": " + parentField.Type.String())
+               }
+
+               parentID = parentField.ID
+               currentField := us.findField(parent + "." + name)
+
+               for _, id := range us.deletes {
+                       if id == parentID {
+                               panic("Cannot add to a column that will be 
deleted: " + parent)
+                       }
+               }
+
+               if currentField != nil {
+                       foundInDeletes := false
+                       for _, id := range us.deletes {
+                               if id == currentField.ID {
+                                       foundInDeletes = true
+
+                                       break
+                               }
+                       }
+                       if !foundInDeletes {
+                               panic("Cannot add column, name already exists: 
" + parent + "." + name)
+                       }
+               }
+               _, present := us.schema.FindColumnName(parentID)
+               if present {
+                       fullName = parent + "." + name
+               } else {
+                       fullName = name
+               }
+
+               nestedField := &iceberg.NestedField{
+                       Name:     name,
+                       ID:       new_id,
+                       Required: required,
+                       Type:     dataType,
+                       Doc:      doc,
+               }
+
+               us.updates[new_id] = nestedField
+               us.parentToAddedIDs[parentID] = 
append(us.parentToAddedIDs[parentID], new_id)
+               us.addedNameToID[fullName] = new_id
+
+       } else {
+
+               field := us.findField(name)
+               if field != nil {
+                       foundInDeletes := false
+                       for _, delete := range us.deletes {
+                               if delete == field.ID {
+                                       foundInDeletes = true
+                               }
+                       }
+                       if !foundInDeletes {
+                               panic("Cannot add column, name already exists: 
" + name)
+                       }
+
+               }
+
+               // cannot add a required column without a default value
+               if initialDefaultValue == nil && !required && 
!us.allowIncompatibleChanges {
+                       panic("incompatible change: cannot add required column 
without a default value: " + name)
+               }
+
+               nestedField := &iceberg.NestedField{
+                       Name:     name,
+                       ID:       new_id,
+                       Required: required,
+                       Type:     dataType,
+                       Doc:      doc,
+               }
+
+               if initialDefaultValue != nil {
+                       nestedField.InitialDefault = initialDefaultValue
+               }
+
+               us.updates[new_id] = nestedField
+               us.parentToAddedIDs[-1] = append(us.parentToAddedIDs[-1], 
new_id)
+               us.addedNameToID[name] = new_id
+       }
+
+       return us
+}
+
+func (su *UpdateSchema) internalUpdateColumnRequirement(name string, 
isRequired bool) {
+       field := su.findForUpdate(name)
+       if field == nil {
+               panic("Cannot update missing column: " + name)
+       }
+
+       if (!isRequired && !field.Required) || (isRequired && field.Required) {
+               // if the change is a noop, allow it even if 
allowIncompatibleChanges is false
+               return
+       }
+
+       isDefaultedAdd := su.isAdded(name) && field.InitialDefault != nil
+
+       if !isRequired && !isDefaultedAdd && !su.allowIncompatibleChanges {
+               panic("Cannot change column nullability: " + name + ": optional 
-> required")
+       }
+
+       for _, id := range su.deletes {
+               if id == field.ID {
+                       panic("Cannot update a column that will be deleted: " + 
field.Name)
+               }
+       }
+
+       field.Required = isRequired
+
+       su.updates[field.ID] = field
+}
+
+func (us *UpdateSchema) RenameColumn(fromPath, newName string) *UpdateSchema {
+       field := us.findField(fromPath)
+       if field == nil {
+               panic("Cannot rename missing column: " + fromPath)
+       }
+       // block rename if the column is being deleted
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic("Cannot rename a column that will be deleted: " + 
fromPath)
+               }
+       }
+
+       // clone or create update entry
+       upd, ok := us.updates[field.ID]
+       if !ok {
+               c := *field // copy
+               upd = &c
+       }
+
+       upd.Name = newName
+       us.updates[field.ID] = upd
+
+       // identifier set book-keeping
+       if _, ok := us.identifierFields[fromPath]; ok {
+               delete(us.identifierFields, fromPath)
+               us.identifierFields[newName] = struct{}{}
+       }
+       return us
+}
+
+// DeleteColumn removes a column from the schema.
+func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema {
+       field := us.findField(name)
+       if field == nil {
+               panic("Cannot delete missing column: " + name)
+       }
+
+       if _, ok := us.parentToAddedIDs[field.ID]; ok {
+               panic("Cannot delete a column that has additions: " + name)
+       }
+
+       if _, ok := us.updates[field.ID]; ok {
+               panic("Cannot delete a column that has updates: " + name)
+       }
+
+       us.deletes = append(us.deletes, field.ID)
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) 
*UpdateSchema {

Review Comment:
   i gave it a try 
   please look at the current way and let me know if thats what you were 
thinking ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to