Shreyas220 commented on code in PR #431: URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2134003289
########## table/update_schema.go: ########## @@ -0,0 +1,532 @@ +package table + +import ( + "github.com/apache/iceberg-go" +) + +type UpdateSchema struct { + base *Metadata + schema *iceberg.Schema + idToParent map[int]int + deletes []int + updates map[int]*iceberg.NestedField + parentToAddedIDs map[int][]int + addedNameToID map[string]int + lastColumnID int + allowIncompatibleChanges bool + identifierFields map[string]struct{} + caseSensitive bool +} + +func NewUpdateSchema(base *Metadata, s *iceberg.Schema, lastColumnID int) *UpdateSchema { + identifierFields := make(map[string]struct{}) + + return &UpdateSchema{ + base: base, + schema: s, + idToParent: make(map[int]int), + deletes: make([]int, 0), + updates: make(map[int]*iceberg.NestedField), + parentToAddedIDs: make(map[int][]int), + addedNameToID: make(map[string]int), + lastColumnID: lastColumnID, + allowIncompatibleChanges: false, + identifierFields: identifierFields, + caseSensitive: true, + } +} + +// AllowIncompatibleChanges permits incompatible schema changes. +func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema { + us.allowIncompatibleChanges = true + + return us +} + +func (us *UpdateSchema) AddColumn(parent, name string, new_id int, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + parentID := -1 + fullName := "" + + if parent != "" { + parentField := us.findField(parent) + if parentField == nil { + panic("Cannot find parent struct: " + parent) + } + + // Get the parent struct's ID + parentID = parentField.ID + + // Store the parent-child relationship + us.idToParent[new_id] = parentID // Add this line + if _, ok := us.parentToAddedIDs[parentID]; !ok { + us.parentToAddedIDs[parentID] = make([]int, 0) + } + us.parentToAddedIDs[parentID] = append(us.parentToAddedIDs[parentID], new_id) + + // Use full qualified name + fullName = parent + "." + name + us.addedNameToID[fullName] = new_id + + parentType := parentField.Type + + // ToDo: handle Nested columns + if nestedType, ok := parentType.(iceberg.NestedType); ok { + if mapType, ok := nestedType.(*iceberg.MapType); ok { + // fields are added to the map value type + vf := mapType.ValueField() + parentField = &vf + } else if listType, ok := nestedType.(*iceberg.ListType); ok { + // fields are added to the element type + ef := listType.ElementField() + parentField = &ef + } else if structType, ok := nestedType.(*iceberg.StructType); ok { + // fields are added to the struct + fields := structType.Fields() + parentField = &fields[len(fields)-1] + } + } else { + panic("Cannot add to non-nested DataType: " + parent + ": " + parentField.Type.String()) + } + + parentID = parentField.ID + currentField := us.findField(parent + "." + name) + + for _, id := range us.deletes { + if id == parentID { + panic("Cannot add to a column that will be deleted: " + parent) + } + } + + if currentField != nil { + foundInDeletes := false + for _, id := range us.deletes { + if id == currentField.ID { + foundInDeletes = true + + break + } + } + if !foundInDeletes { + panic("Cannot add column, name already exists: " + parent + "." + name) + } + } + _, present := us.schema.FindColumnName(parentID) + if present { + fullName = parent + "." + name + } else { + fullName = name + } + + nestedField := &iceberg.NestedField{ + Name: name, + ID: new_id, + Required: required, + Type: dataType, + Doc: doc, + } + + us.updates[new_id] = nestedField + us.parentToAddedIDs[parentID] = append(us.parentToAddedIDs[parentID], new_id) + us.addedNameToID[fullName] = new_id + + } else { + + field := us.findField(name) + if field != nil { + foundInDeletes := false + for _, delete := range us.deletes { + if delete == field.ID { + foundInDeletes = true + } + } + if !foundInDeletes { + panic("Cannot add column, name already exists: " + name) + } + + } + + // cannot add a required column without a default value + if initialDefaultValue == nil && !required && !us.allowIncompatibleChanges { + panic("incompatible change: cannot add required column without a default value: " + name) + } + + nestedField := &iceberg.NestedField{ + Name: name, + ID: new_id, + Required: required, + Type: dataType, + Doc: doc, + } + + if initialDefaultValue != nil { + nestedField.InitialDefault = initialDefaultValue + } + + us.updates[new_id] = nestedField + us.parentToAddedIDs[-1] = append(us.parentToAddedIDs[-1], new_id) + us.addedNameToID[name] = new_id + } + + return us +} + +func (su *UpdateSchema) internalUpdateColumnRequirement(name string, isRequired bool) { + field := su.findForUpdate(name) + if field == nil { + panic("Cannot update missing column: " + name) + } + + if (!isRequired && !field.Required) || (isRequired && field.Required) { + // if the change is a noop, allow it even if allowIncompatibleChanges is false + return + } + + isDefaultedAdd := su.isAdded(name) && field.InitialDefault != nil + + if !isRequired && !isDefaultedAdd && !su.allowIncompatibleChanges { + panic("Cannot change column nullability: " + name + ": optional -> required") + } + + for _, id := range su.deletes { + if id == field.ID { + panic("Cannot update a column that will be deleted: " + field.Name) + } + } + + field.Required = isRequired + + su.updates[field.ID] = field +} + +func (us *UpdateSchema) RenameColumn(fromPath, newName string) *UpdateSchema { + field := us.findField(fromPath) + if field == nil { + panic("Cannot rename missing column: " + fromPath) + } + // block rename if the column is being deleted + for _, id := range us.deletes { + if id == field.ID { + panic("Cannot rename a column that will be deleted: " + fromPath) + } + } + + // clone or create update entry + upd, ok := us.updates[field.ID] + if !ok { + c := *field // copy + upd = &c + } + + upd.Name = newName + us.updates[field.ID] = upd + + // identifier set book-keeping + if _, ok := us.identifierFields[fromPath]; ok { + delete(us.identifierFields, fromPath) + us.identifierFields[newName] = struct{}{} + } + return us +} + +// DeleteColumn removes a column from the schema. +func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema { + field := us.findField(name) + if field == nil { + panic("Cannot delete missing column: " + name) + } + + if _, ok := us.parentToAddedIDs[field.ID]; ok { + panic("Cannot delete a column that has additions: " + name) + } + + if _, ok := us.updates[field.ID]; ok { + panic("Cannot delete a column that has updates: " + name) + } + + us.deletes = append(us.deletes, field.ID) + + return us +} + +func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) *UpdateSchema { Review Comment: i gave it a try please look at the current way and let me know if thats what you were thinking ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org