laskoviymishka commented on code in PR #431: URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2126896000
########## table/update_schema.go: ########## @@ -0,0 +1,484 @@ +package table + +import ( + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSchema struct { + ops Operation + base *Metadata + schema *iceberg.Schema + idToParent map[int]int + deletes []int + updates map[int]*iceberg.NestedField + parentToAddedIDs map[int][]int + addedNameToID map[string]int + lastColumnID int + allowIncompatibleChanges bool + identifierFields map[string]struct{} + caseSensitive bool +} + +// ══════════════════════════════USER-FACING═══════════════════════════════════║ Review Comment: no need to specify it, it' already obvious from a code itself. ########## table/update_schema.go: ########## @@ -0,0 +1,484 @@ +package table + +import ( + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSchema struct { + ops Operation + base *Metadata + schema *iceberg.Schema + idToParent map[int]int + deletes []int + updates map[int]*iceberg.NestedField + parentToAddedIDs map[int][]int + addedNameToID map[string]int + lastColumnID int + allowIncompatibleChanges bool + identifierFields map[string]struct{} + caseSensitive bool +} + +// ══════════════════════════════USER-FACING═══════════════════════════════════║ + +func NewUpdateSchema(ops Operation, base *Metadata, s *iceberg.Schema, lastColumnID int) *UpdateSchema { + identifierFields := make(map[string]struct{}) + + return &UpdateSchema{ + ops: ops, + base: base, + schema: s, + idToParent: make(map[int]int), + deletes: make([]int, 0), + updates: make(map[int]*iceberg.NestedField), + parentToAddedIDs: make(map[int][]int), + addedNameToID: make(map[string]int), + lastColumnID: lastColumnID, + allowIncompatibleChanges: false, + identifierFields: identifierFields, + caseSensitive: true, + } +} + +// AllowIncompatibleChanges permits incompatible schema changes. +func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema { + us.allowIncompatibleChanges = true + return us +} + +// AddNestedColumn adds a nested column to the schema. +func (us *UpdateSchema) AddNestedColumn(parent, name string, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn(parent, name, us.assignNewColumnID(), required, dataType, doc, initialDefaultValue) +} + +// AddColumn adds a column to the schema. +func (us *UpdateSchema) AddColumn(name string, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), required, dataType, doc, initialDefaultValue) +} + +// AddRequiredColumn adds a required column to the schema. +func (us *UpdateSchema) AddRequiredColumn(name string, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), true, dataType, doc, initialDefaultValue) +} + +// AddOptionalColumn adds an optional column to the schema. +func (us *UpdateSchema) AddOptionalColumn(name string, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), false, dataType, doc, initialDefaultValue) +} + +// DeleteColumn removes a column from the schema. +func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema { + + field := us.findField(name) + if field == nil { + panic(fmt.Sprintf("Cannot delete missing column: %s", name)) + } + + if _, ok := us.parentToAddedIDs[field.ID]; ok { + panic(fmt.Sprintf("Cannot delete a column that has additions: %s", name)) + } + + if _, ok := us.updates[field.ID]; ok { + panic(fmt.Sprintf("Cannot delete a column that has updates: %s", name)) + } + + us.deletes = append(us.deletes, field.ID) + + return us +} + +func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update type of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.Type.Equals(newType) { + return us + } + + //check promotion + if !allowedPromotion(field.Type, newType) { + panic(fmt.Sprintf("Cannot update type of column: %s: %s", field.Name, newType)) + } + + us.updates[field.ID].Type = newType + return us +} + +func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update type of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.Doc == doc { + return us + } + + us.updates[field.ID].Doc = doc + + return us +} + +func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update default value of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.InitialDefault == defaultValue { + return us + } + + us.updates[field.ID].InitialDefault = defaultValue + return us +} + +// RequireColumn changes an optional column to required. +func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema { + us.internalUpdateColumnRequirement(name, true) + return us +} + +// MakeColumnOptional changes a required column to optional. +func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema { + us.internalUpdateColumnRequirement(name, false) + return us +} + +func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField { + existing := us.findField(name) + if existing != nil { + if update, ok := us.updates[existing.ID]; ok { + return update + } + + // adding to updates + us.updates[existing.ID] = existing + return existing + } + + addedID, ok := us.addedNameToID[name] + if ok { + return us.updates[addedID] + } + + return nil +} + +func (us *UpdateSchema) Apply() *iceberg.Schema { + return us.applyChanges() +} + +// ══════════════════════════════INTERNAL══════════════════════════════════════║ Review Comment: no need to specify it, it's already visible from language itself ########## table/update_schema.go: ########## @@ -0,0 +1,484 @@ +package table + +import ( + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSchema struct { + ops Operation + base *Metadata + schema *iceberg.Schema + idToParent map[int]int + deletes []int + updates map[int]*iceberg.NestedField + parentToAddedIDs map[int][]int + addedNameToID map[string]int + lastColumnID int + allowIncompatibleChanges bool + identifierFields map[string]struct{} + caseSensitive bool +} + +// ══════════════════════════════USER-FACING═══════════════════════════════════║ + +func NewUpdateSchema(ops Operation, base *Metadata, s *iceberg.Schema, lastColumnID int) *UpdateSchema { + identifierFields := make(map[string]struct{}) + + return &UpdateSchema{ + ops: ops, + base: base, + schema: s, + idToParent: make(map[int]int), + deletes: make([]int, 0), + updates: make(map[int]*iceberg.NestedField), + parentToAddedIDs: make(map[int][]int), + addedNameToID: make(map[string]int), + lastColumnID: lastColumnID, + allowIncompatibleChanges: false, + identifierFields: identifierFields, + caseSensitive: true, + } +} + +// AllowIncompatibleChanges permits incompatible schema changes. +func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema { + us.allowIncompatibleChanges = true + return us +} + +// AddNestedColumn adds a nested column to the schema. +func (us *UpdateSchema) AddNestedColumn(parent, name string, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn(parent, name, us.assignNewColumnID(), required, dataType, doc, initialDefaultValue) +} + +// AddColumn adds a column to the schema. +func (us *UpdateSchema) AddColumn(name string, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), required, dataType, doc, initialDefaultValue) +} + +// AddRequiredColumn adds a required column to the schema. +func (us *UpdateSchema) AddRequiredColumn(name string, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), true, dataType, doc, initialDefaultValue) +} + +// AddOptionalColumn adds an optional column to the schema. +func (us *UpdateSchema) AddOptionalColumn(name string, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), false, dataType, doc, initialDefaultValue) +} + +// DeleteColumn removes a column from the schema. +func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema { + + field := us.findField(name) + if field == nil { + panic(fmt.Sprintf("Cannot delete missing column: %s", name)) + } + + if _, ok := us.parentToAddedIDs[field.ID]; ok { + panic(fmt.Sprintf("Cannot delete a column that has additions: %s", name)) + } + + if _, ok := us.updates[field.ID]; ok { + panic(fmt.Sprintf("Cannot delete a column that has updates: %s", name)) + } + + us.deletes = append(us.deletes, field.ID) + + return us +} + +func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update type of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.Type.Equals(newType) { + return us + } + + //check promotion + if !allowedPromotion(field.Type, newType) { + panic(fmt.Sprintf("Cannot update type of column: %s: %s", field.Name, newType)) + } + + us.updates[field.ID].Type = newType + return us +} + +func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update type of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.Doc == doc { + return us + } + + us.updates[field.ID].Doc = doc + + return us +} + +func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update default value of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.InitialDefault == defaultValue { + return us + } + + us.updates[field.ID].InitialDefault = defaultValue + return us +} + +// RequireColumn changes an optional column to required. +func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema { + us.internalUpdateColumnRequirement(name, true) + return us +} + +// MakeColumnOptional changes a required column to optional. +func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema { + us.internalUpdateColumnRequirement(name, false) + return us +} + +func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField { + existing := us.findField(name) + if existing != nil { + if update, ok := us.updates[existing.ID]; ok { + return update + } + + // adding to updates + us.updates[existing.ID] = existing + return existing + } + + addedID, ok := us.addedNameToID[name] + if ok { + return us.updates[addedID] + } + + return nil +} + +func (us *UpdateSchema) Apply() *iceberg.Schema { + return us.applyChanges() +} + +// ══════════════════════════════INTERNAL══════════════════════════════════════║ + +func (su *UpdateSchema) isAdded(name string) bool { + _, ok := su.addedNameToID[name] + return ok +} + +func (us *UpdateSchema) internalAddColumn(parent, name string, new_id int, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { Review Comment: why is this internal? I don't see any good reason to hide it. ########## table/update_schema.go: ########## @@ -0,0 +1,484 @@ +package table + +import ( + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSchema struct { + ops Operation + base *Metadata + schema *iceberg.Schema + idToParent map[int]int + deletes []int + updates map[int]*iceberg.NestedField + parentToAddedIDs map[int][]int + addedNameToID map[string]int + lastColumnID int + allowIncompatibleChanges bool + identifierFields map[string]struct{} + caseSensitive bool +} + +// ══════════════════════════════USER-FACING═══════════════════════════════════║ + +func NewUpdateSchema(ops Operation, base *Metadata, s *iceberg.Schema, lastColumnID int) *UpdateSchema { + identifierFields := make(map[string]struct{}) + + return &UpdateSchema{ + ops: ops, + base: base, + schema: s, + idToParent: make(map[int]int), + deletes: make([]int, 0), + updates: make(map[int]*iceberg.NestedField), + parentToAddedIDs: make(map[int][]int), + addedNameToID: make(map[string]int), + lastColumnID: lastColumnID, + allowIncompatibleChanges: false, + identifierFields: identifierFields, + caseSensitive: true, + } +} + +// AllowIncompatibleChanges permits incompatible schema changes. +func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema { + us.allowIncompatibleChanges = true + return us +} + +// AddNestedColumn adds a nested column to the schema. +func (us *UpdateSchema) AddNestedColumn(parent, name string, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn(parent, name, us.assignNewColumnID(), required, dataType, doc, initialDefaultValue) +} + +// AddColumn adds a column to the schema. +func (us *UpdateSchema) AddColumn(name string, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), required, dataType, doc, initialDefaultValue) +} + +// AddRequiredColumn adds a required column to the schema. +func (us *UpdateSchema) AddRequiredColumn(name string, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), true, dataType, doc, initialDefaultValue) +} + +// AddOptionalColumn adds an optional column to the schema. +func (us *UpdateSchema) AddOptionalColumn(name string, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { + return us.internalAddColumn("", name, us.assignNewColumnID(), false, dataType, doc, initialDefaultValue) +} + +// DeleteColumn removes a column from the schema. +func (us *UpdateSchema) DeleteColumn(name string) *UpdateSchema { + + field := us.findField(name) + if field == nil { + panic(fmt.Sprintf("Cannot delete missing column: %s", name)) + } + + if _, ok := us.parentToAddedIDs[field.ID]; ok { + panic(fmt.Sprintf("Cannot delete a column that has additions: %s", name)) + } + + if _, ok := us.updates[field.ID]; ok { + panic(fmt.Sprintf("Cannot delete a column that has updates: %s", name)) + } + + us.deletes = append(us.deletes, field.ID) + + return us +} + +func (us *UpdateSchema) UpdateColumnType(name string, newType iceberg.Type) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update type of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.Type.Equals(newType) { + return us + } + + //check promotion + if !allowedPromotion(field.Type, newType) { + panic(fmt.Sprintf("Cannot update type of column: %s: %s", field.Name, newType)) + } + + us.updates[field.ID].Type = newType + return us +} + +func (us *UpdateSchema) UpdateColumnDoc(name string, doc string) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update type of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.Doc == doc { + return us + } + + us.updates[field.ID].Doc = doc + + return us +} + +func (us *UpdateSchema) UpdateColumnDefault(name string, defaultValue any) *UpdateSchema { + field := us.findForUpdate(name) + if field == nil { + panic(fmt.Sprintf("Cannot update default value of missing column: %s", name)) + } + + for _, id := range us.deletes { + if id == field.ID { + panic(fmt.Sprintf("Cannot update a column that will be deleted: %s", field.Name)) + } + } + + if field.InitialDefault == defaultValue { + return us + } + + us.updates[field.ID].InitialDefault = defaultValue + return us +} + +// RequireColumn changes an optional column to required. +func (us *UpdateSchema) RequireColumn(name string) *UpdateSchema { + us.internalUpdateColumnRequirement(name, true) + return us +} + +// MakeColumnOptional changes a required column to optional. +func (us *UpdateSchema) MakeColumnOptional(name string) *UpdateSchema { + us.internalUpdateColumnRequirement(name, false) + return us +} + +func (us *UpdateSchema) findForUpdate(name string) *iceberg.NestedField { + existing := us.findField(name) + if existing != nil { + if update, ok := us.updates[existing.ID]; ok { + return update + } + + // adding to updates + us.updates[existing.ID] = existing + return existing + } + + addedID, ok := us.addedNameToID[name] + if ok { + return us.updates[addedID] + } + + return nil +} + +func (us *UpdateSchema) Apply() *iceberg.Schema { + return us.applyChanges() +} + +// ══════════════════════════════INTERNAL══════════════════════════════════════║ + +func (su *UpdateSchema) isAdded(name string) bool { + _, ok := su.addedNameToID[name] + return ok +} + +func (us *UpdateSchema) internalAddColumn(parent, name string, new_id int, required bool, dataType iceberg.Type, doc string, initialDefaultValue any) *UpdateSchema { Review Comment: same for all other methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org