laskoviymishka commented on code in PR #431:
URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2126896000


##########
table/update_schema.go:
##########
@@ -0,0 +1,484 @@
+package table
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+type UpdateSchema struct {
+       ops                      Operation
+       base                     *Metadata
+       schema                   *iceberg.Schema
+       idToParent               map[int]int
+       deletes                  []int
+       updates                  map[int]*iceberg.NestedField
+       parentToAddedIDs         map[int][]int
+       addedNameToID            map[string]int
+       lastColumnID             int
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+// 
══════════════════════════════USER-FACING═══════════════════════════════════║

Review Comment:
   no need to specify it, it' already obvious from a code itself.



##########
table/update_schema.go:
##########
@@ -0,0 +1,484 @@
+package table
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+type UpdateSchema struct {
+       ops                      Operation
+       base                     *Metadata
+       schema                   *iceberg.Schema
+       idToParent               map[int]int
+       deletes                  []int
+       updates                  map[int]*iceberg.NestedField
+       parentToAddedIDs         map[int][]int
+       addedNameToID            map[string]int
+       lastColumnID             int
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+// 
══════════════════════════════USER-FACING═══════════════════════════════════║
+
+func NewUpdateSchema(ops Operation, base *Metadata, s *iceberg.Schema, 
lastColumnID int) *UpdateSchema {
+       identifierFields := make(map[string]struct{})
+
+       return &UpdateSchema{
+               ops:                      ops,
+               base:                     base,
+               schema:                   s,
+               idToParent:               make(map[int]int),
+               deletes:                  make([]int, 0),
+               updates:                  make(map[int]*iceberg.NestedField),
+               parentToAddedIDs:         make(map[int][]int),
+               addedNameToID:            make(map[string]int),
+               lastColumnID:             lastColumnID,
+               allowIncompatibleChanges: false,
+               identifierFields:         identifierFields,
+               caseSensitive:            true,
+       }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes.
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+       us.allowIncompatibleChanges = true
+       return us
+}
+
+// AddNestedColumn adds a nested column to the schema.
+func (us *UpdateSchema) AddNestedColumn(parent, name string, required bool, 
dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn(parent, name, us.assignNewColumnID(), 
required, dataType, doc, initialDefaultValue)
+}
+
+// AddColumn adds a column to the schema.
+func (us *UpdateSchema) AddColumn(name string, required bool, dataType 
iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), required, 
dataType, doc, initialDefaultValue)
+}
+
+// AddRequiredColumn adds a required column to the schema.
+func (us *UpdateSchema) AddRequiredColumn(name string, dataType iceberg.Type, 
doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), true, 
dataType, doc, initialDefaultValue)
+}
+
+// AddOptionalColumn adds an optional column to the schema.
+func (us *UpdateSchema) AddOptionalColumn(name string, dataType iceberg.Type, 
doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), false, 
dataType, doc, initialDefaultValue)
+}
+
+// DeleteColumn removes a column from the schema.
+func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema {
+
+       field := us.findField(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot delete missing column: %s", name))
+       }
+
+       if _, ok := us.parentToAddedIDs[field.ID]; ok {
+               panic(fmt.Sprintf("Cannot delete a column that has additions: 
%s", name))
+       }
+
+       if _, ok := us.updates[field.ID]; ok {
+               panic(fmt.Sprintf("Cannot delete a column that has updates: 
%s", name))
+       }
+
+       us.deletes = append(us.deletes, field.ID)
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update type of missing column: %s", 
name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.Type.Equals(newType) {
+               return us
+       }
+
+       //check promotion
+       if !allowedPromotion(field.Type, newType) {
+               panic(fmt.Sprintf("Cannot update type of column: %s: %s", 
field.Name, newType))
+       }
+
+       us.updates[field.ID].Type = newType
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema 
{
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update type of missing column: %s", 
name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.Doc == doc {
+               return us
+       }
+
+       us.updates[field.ID].Doc = doc
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update default value of missing 
column: %s", name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.InitialDefault == defaultValue {
+               return us
+       }
+
+       us.updates[field.ID].InitialDefault = defaultValue
+       return us
+}
+
+// RequireColumn changes an optional column to required.
+func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, true)
+       return us
+}
+
+// MakeColumnOptional changes a required column to optional.
+func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, false)
+       return us
+}
+
+func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField {
+       existing := us.findField(name)
+       if existing != nil {
+               if update, ok := us.updates[existing.ID]; ok {
+                       return update
+               }
+
+               // adding to updates
+               us.updates[existing.ID] = existing
+               return existing
+       }
+
+       addedID, ok := us.addedNameToID[name]
+       if ok {
+               return us.updates[addedID]
+       }
+
+       return nil
+}
+
+func (us *UpdateSchema) Apply() *iceberg.Schema {
+       return us.applyChanges()
+}
+
+// 
══════════════════════════════INTERNAL══════════════════════════════════════║

Review Comment:
   no need to specify it, it's already visible from language itself



##########
table/update_schema.go:
##########
@@ -0,0 +1,484 @@
+package table
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+type UpdateSchema struct {
+       ops                      Operation
+       base                     *Metadata
+       schema                   *iceberg.Schema
+       idToParent               map[int]int
+       deletes                  []int
+       updates                  map[int]*iceberg.NestedField
+       parentToAddedIDs         map[int][]int
+       addedNameToID            map[string]int
+       lastColumnID             int
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+// 
══════════════════════════════USER-FACING═══════════════════════════════════║
+
+func NewUpdateSchema(ops Operation, base *Metadata, s *iceberg.Schema, 
lastColumnID int) *UpdateSchema {
+       identifierFields := make(map[string]struct{})
+
+       return &UpdateSchema{
+               ops:                      ops,
+               base:                     base,
+               schema:                   s,
+               idToParent:               make(map[int]int),
+               deletes:                  make([]int, 0),
+               updates:                  make(map[int]*iceberg.NestedField),
+               parentToAddedIDs:         make(map[int][]int),
+               addedNameToID:            make(map[string]int),
+               lastColumnID:             lastColumnID,
+               allowIncompatibleChanges: false,
+               identifierFields:         identifierFields,
+               caseSensitive:            true,
+       }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes.
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+       us.allowIncompatibleChanges = true
+       return us
+}
+
+// AddNestedColumn adds a nested column to the schema.
+func (us *UpdateSchema) AddNestedColumn(parent, name string, required bool, 
dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn(parent, name, us.assignNewColumnID(), 
required, dataType, doc, initialDefaultValue)
+}
+
+// AddColumn adds a column to the schema.
+func (us *UpdateSchema) AddColumn(name string, required bool, dataType 
iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), required, 
dataType, doc, initialDefaultValue)
+}
+
+// AddRequiredColumn adds a required column to the schema.
+func (us *UpdateSchema) AddRequiredColumn(name string, dataType iceberg.Type, 
doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), true, 
dataType, doc, initialDefaultValue)
+}
+
+// AddOptionalColumn adds an optional column to the schema.
+func (us *UpdateSchema) AddOptionalColumn(name string, dataType iceberg.Type, 
doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), false, 
dataType, doc, initialDefaultValue)
+}
+
+// DeleteColumn removes a column from the schema.
+func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema {
+
+       field := us.findField(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot delete missing column: %s", name))
+       }
+
+       if _, ok := us.parentToAddedIDs[field.ID]; ok {
+               panic(fmt.Sprintf("Cannot delete a column that has additions: 
%s", name))
+       }
+
+       if _, ok := us.updates[field.ID]; ok {
+               panic(fmt.Sprintf("Cannot delete a column that has updates: 
%s", name))
+       }
+
+       us.deletes = append(us.deletes, field.ID)
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update type of missing column: %s", 
name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.Type.Equals(newType) {
+               return us
+       }
+
+       //check promotion
+       if !allowedPromotion(field.Type, newType) {
+               panic(fmt.Sprintf("Cannot update type of column: %s: %s", 
field.Name, newType))
+       }
+
+       us.updates[field.ID].Type = newType
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema 
{
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update type of missing column: %s", 
name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.Doc == doc {
+               return us
+       }
+
+       us.updates[field.ID].Doc = doc
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update default value of missing 
column: %s", name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.InitialDefault == defaultValue {
+               return us
+       }
+
+       us.updates[field.ID].InitialDefault = defaultValue
+       return us
+}
+
+// RequireColumn changes an optional column to required.
+func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, true)
+       return us
+}
+
+// MakeColumnOptional changes a required column to optional.
+func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, false)
+       return us
+}
+
+func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField {
+       existing := us.findField(name)
+       if existing != nil {
+               if update, ok := us.updates[existing.ID]; ok {
+                       return update
+               }
+
+               // adding to updates
+               us.updates[existing.ID] = existing
+               return existing
+       }
+
+       addedID, ok := us.addedNameToID[name]
+       if ok {
+               return us.updates[addedID]
+       }
+
+       return nil
+}
+
+func (us *UpdateSchema) Apply() *iceberg.Schema {
+       return us.applyChanges()
+}
+
+// 
══════════════════════════════INTERNAL══════════════════════════════════════║
+
+func (su *UpdateSchema) isAdded(name string) bool {
+       _, ok := su.addedNameToID[name]
+       return ok
+}
+
+func (us *UpdateSchema) internalAddColumn(parent, name string, new_id int, 
required bool, dataType iceberg.Type, doc string, initialDefaultValue any) 
*UpdateSchema {

Review Comment:
   why is this internal? I don't see any good reason to hide it.



##########
table/update_schema.go:
##########
@@ -0,0 +1,484 @@
+package table
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+)
+
+type UpdateSchema struct {
+       ops                      Operation
+       base                     *Metadata
+       schema                   *iceberg.Schema
+       idToParent               map[int]int
+       deletes                  []int
+       updates                  map[int]*iceberg.NestedField
+       parentToAddedIDs         map[int][]int
+       addedNameToID            map[string]int
+       lastColumnID             int
+       allowIncompatibleChanges bool
+       identifierFields         map[string]struct{}
+       caseSensitive            bool
+}
+
+// 
══════════════════════════════USER-FACING═══════════════════════════════════║
+
+func NewUpdateSchema(ops Operation, base *Metadata, s *iceberg.Schema, 
lastColumnID int) *UpdateSchema {
+       identifierFields := make(map[string]struct{})
+
+       return &UpdateSchema{
+               ops:                      ops,
+               base:                     base,
+               schema:                   s,
+               idToParent:               make(map[int]int),
+               deletes:                  make([]int, 0),
+               updates:                  make(map[int]*iceberg.NestedField),
+               parentToAddedIDs:         make(map[int][]int),
+               addedNameToID:            make(map[string]int),
+               lastColumnID:             lastColumnID,
+               allowIncompatibleChanges: false,
+               identifierFields:         identifierFields,
+               caseSensitive:            true,
+       }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes.
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+       us.allowIncompatibleChanges = true
+       return us
+}
+
+// AddNestedColumn adds a nested column to the schema.
+func (us *UpdateSchema) AddNestedColumn(parent, name string, required bool, 
dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn(parent, name, us.assignNewColumnID(), 
required, dataType, doc, initialDefaultValue)
+}
+
+// AddColumn adds a column to the schema.
+func (us *UpdateSchema) AddColumn(name string, required bool, dataType 
iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), required, 
dataType, doc, initialDefaultValue)
+}
+
+// AddRequiredColumn adds a required column to the schema.
+func (us *UpdateSchema) AddRequiredColumn(name string, dataType iceberg.Type, 
doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), true, 
dataType, doc, initialDefaultValue)
+}
+
+// AddOptionalColumn adds an optional column to the schema.
+func (us *UpdateSchema) AddOptionalColumn(name string, dataType iceberg.Type, 
doc string, initialDefaultValue any) *UpdateSchema {
+       return us.internalAddColumn("", name, us.assignNewColumnID(), false, 
dataType, doc, initialDefaultValue)
+}
+
+// DeleteColumn removes a column from the schema.
+func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema {
+
+       field := us.findField(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot delete missing column: %s", name))
+       }
+
+       if _, ok := us.parentToAddedIDs[field.ID]; ok {
+               panic(fmt.Sprintf("Cannot delete a column that has additions: 
%s", name))
+       }
+
+       if _, ok := us.updates[field.ID]; ok {
+               panic(fmt.Sprintf("Cannot delete a column that has updates: 
%s", name))
+       }
+
+       us.deletes = append(us.deletes, field.ID)
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update type of missing column: %s", 
name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.Type.Equals(newType) {
+               return us
+       }
+
+       //check promotion
+       if !allowedPromotion(field.Type, newType) {
+               panic(fmt.Sprintf("Cannot update type of column: %s: %s", 
field.Name, newType))
+       }
+
+       us.updates[field.ID].Type = newType
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema 
{
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update type of missing column: %s", 
name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.Doc == doc {
+               return us
+       }
+
+       us.updates[field.ID].Doc = doc
+
+       return us
+}
+
+func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) 
*UpdateSchema {
+       field := us.findForUpdate(name)
+       if field == nil {
+               panic(fmt.Sprintf("Cannot update default value of missing 
column: %s", name))
+       }
+
+       for _, id := range us.deletes {
+               if id == field.ID {
+                       panic(fmt.Sprintf("Cannot update a column that will be 
deleted: %s", field.Name))
+               }
+       }
+
+       if field.InitialDefault == defaultValue {
+               return us
+       }
+
+       us.updates[field.ID].InitialDefault = defaultValue
+       return us
+}
+
+// RequireColumn changes an optional column to required.
+func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, true)
+       return us
+}
+
+// MakeColumnOptional changes a required column to optional.
+func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema {
+       us.internalUpdateColumnRequirement(name, false)
+       return us
+}
+
+func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField {
+       existing := us.findField(name)
+       if existing != nil {
+               if update, ok := us.updates[existing.ID]; ok {
+                       return update
+               }
+
+               // adding to updates
+               us.updates[existing.ID] = existing
+               return existing
+       }
+
+       addedID, ok := us.addedNameToID[name]
+       if ok {
+               return us.updates[addedID]
+       }
+
+       return nil
+}
+
+func (us *UpdateSchema) Apply() *iceberg.Schema {
+       return us.applyChanges()
+}
+
+// 
══════════════════════════════INTERNAL══════════════════════════════════════║
+
+func (su *UpdateSchema) isAdded(name string) bool {
+       _, ok := su.addedNameToID[name]
+       return ok
+}
+
+func (us *UpdateSchema) internalAddColumn(parent, name string, new_id int, 
required bool, dataType iceberg.Type, doc string, initialDefaultValue any) 
*UpdateSchema {

Review Comment:
   same for all other methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to