zeroshade commented on code in PR #431:
URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2255039668
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
Review Comment:
needs a docstring comment
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+ if len(us.operations) > 0 {
+ us.operations = us.operations[:len(us.operations)-1]
+ }
+ return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+ result := make([]string, len(us.operations))
+ for i, op := range us.operations {
+ result[i] = op.String()
+ }
+ return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+ // Clear the previous errors
+ // because if validation is called multiple times, the errors will
accumulate even after removing operations causing error
+ us.errors = us.errors[:0]
+
+ settings := &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ }
+
+ for _, op := range us.operations {
+ if err := op.Validate(us.schema, settings); err != nil {
+ us.errors = append(us.errors, err)
+ }
+ }
+
+ if len(us.errors) > 0 {
+ return fmt.Errorf("validation failed with %d errors: %v",
len(us.errors), us.errors)
+ }
+ return nil
Review Comment:
`return errors.Join(us.errors)` much more concise and simple :smile:
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+ if len(us.operations) > 0 {
+ us.operations = us.operations[:len(us.operations)-1]
+ }
+ return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+ result := make([]string, len(us.operations))
+ for i, op := range us.operations {
+ result[i] = op.String()
+ }
+ return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+ // Clear the previous errors
+ // because if validation is called multiple times, the errors will
accumulate even after removing operations causing error
+ us.errors = us.errors[:0]
+
+ settings := &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ }
+
+ for _, op := range us.operations {
+ if err := op.Validate(us.schema, settings); err != nil {
+ us.errors = append(us.errors, err)
+ }
+ }
+
+ if len(us.errors) > 0 {
+ return fmt.Errorf("validation failed with %d errors: %v",
len(us.errors), us.errors)
+ }
+ return nil
+}
+
+// Build validates and constructs the final schema
+func (us *UpdateSchema) Build() (*iceberg.Schema, error) {
+ // First validate all operations
+ if err := us.Validate(); err != nil {
+ return nil, err
+ }
+
+ // No operations means no changes
+ if len(us.operations) == 0 {
+ return us.schema, nil
+ }
+
+ // Create builder and apply operations
+ builder := &schemaBuilder{
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]*iceberg.NestedField),
+ adds: make(map[int][]*iceberg.NestedField),
+ moves: make(map[int][]moveReq),
+ lastColumnID: us.lastColumnID,
+ baseSchema: us.schema,
+ settings: &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ },
+ }
+
+ // Apply operations in order
+ for _, op := range us.operations {
+ if err := op.Apply(builder); err != nil {
+ return nil, fmt.Errorf("failed to apply operation %s:
%w", op.String(), err)
+ }
+ }
+
+ // Build final schema
+ return us.buildFinalSchema(builder)
+}
+
+// Commit validates, builds, and commits the schema changes
+func (us *UpdateSchema) Commit() error {
+ updates, requirements, err := us.CommitUpdates()
+ if err != nil {
+ return err
+ }
+
+ if len(updates) == 0 {
+ return nil
+ }
+
+ return us.txn.apply(updates, requirements)
+}
+
+// CommitUpdates returns the updates and requirements needed
+func (us *UpdateSchema) CommitUpdates() ([]Update, []Requirement, error) {
+ // If there are no operations, return nil updates and requirements
+ if len(us.operations) == 0 {
+ return nil, nil, nil
+ }
+
+ newSchema, err := us.Build()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Check if equivalent schema exists
+ existingSchemaID := us.findExistingSchemaInTransaction(newSchema)
+
+ var updates []Update
+ var requirements []Requirement
+
+ // for Commit Contention
+ requirements = append(requirements, AssertCurrentSchemaID(us.schema.ID))
+
+ if existingSchemaID == nil {
+ // Get the final lastColumnID from the builder
+ builder := &schemaBuilder{
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]*iceberg.NestedField),
+ adds: make(map[int][]*iceberg.NestedField),
+ moves: make(map[int][]moveReq),
+ lastColumnID: us.lastColumnID,
+ baseSchema: us.schema,
+ settings: &validationSettings{
+ allowIncompatibleChanges:
us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ },
+ }
+
+ for _, op := range us.operations {
+ if err := op.Apply(builder); err != nil {
+ return nil, nil, fmt.Errorf("failed to
recompute lastColumnID: %w", err)
+ }
+ }
+ updates = append(updates,
+ NewAddSchemaUpdate(newSchema, builder.lastColumnID,
false),
+ NewSetCurrentSchemaUpdate(newSchema.ID),
+ )
+ } else {
+ updates = append(updates,
NewSetCurrentSchemaUpdate(*existingSchemaID))
+ }
+
+ if nameMapUpdates := us.getNameMappingUpdates(newSchema);
len(nameMapUpdates) > 0 {
+ updates = append(updates, nameMapUpdates...)
+ }
+
+ return updates, requirements, nil
+}
+
+/// AddColumn Operation
+
+// Implementation of Operation interface for addColumnOp
+func (op *addColumnOp) Validate(schema *iceberg.Schema, settings
*validationSettings) error {
+ if len(op.path) == 0 {
+ return errors.New("AddColumn: path must contain at least the
new column name")
+ }
+
+ // Check if field already exists
+ if existing := findField(schema, op.path, settings.caseSensitive);
existing != nil {
+ return fmt.Errorf("cannot add column; name already exists: %s",
strings.Join(op.path, "."))
+ }
+
+ // Validate parent exists and is a nested type
+ if len(op.path) > 1 {
+ parentPath := op.path[:len(op.path)-1]
+ pf := findField(schema, parentPath, settings.caseSensitive)
+ if pf == nil {
+ return fmt.Errorf("cannot find parent struct: %s",
strings.Join(parentPath, "."))
+ }
+
+ switch pf.Type.(type) {
+ case *iceberg.StructType, *iceberg.MapType, *iceberg.ListType:
+ // Valid parent types
+ default:
+ return fmt.Errorf("parent is not a nested type: %s",
strings.Join(parentPath, "."))
+ }
+ }
+
+ // Validate required column has default or incompatible changes are
allowed
+ if op.required && op.initialDefault == nil &&
!settings.allowIncompatibleChanges {
+ return fmt.Errorf("cannot add required column without default
value: %s", strings.Join(op.path, "."))
+ }
+
+ // Validate default value type compatibility
+ if op.initialDefault != nil {
+ if err := validateDefaultValue(op.dataType, op.initialDefault);
err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (op *addColumnOp) Apply(builder *schemaBuilder) error {
+ colName := op.path[len(op.path)-1]
+ parentID := -1
+
+ if len(op.path) > 1 {
+ parentPath := op.path[:len(op.path)-1]
+ pf := findField(builder.baseSchema, parentPath,
builder.settings.caseSensitive)
+ if pf == nil {
+ return fmt.Errorf("cannot find parent struct: %s",
strings.Join(parentPath, "."))
+ }
+
+ switch nt := pf.Type.(type) {
+ case *iceberg.StructType:
+ parentID = pf.ID
+ case *iceberg.MapType:
+ vf := nt.ValueField()
+ parentID = vf.ID
+ case *iceberg.ListType:
+ ef := nt.ElementField()
+ parentID = ef.ID
+ default:
+ return fmt.Errorf("parent is not a nested type: %s",
strings.Join(parentPath, "."))
+ }
+ }
+
+ newID := builder.assignNewColumnID()
+ nf := &iceberg.NestedField{
+ Name: colName,
+ ID: newID,
+ Required: op.required,
+ Type: op.dataType,
+ Doc: op.doc,
+ InitialDefault: op.initialDefault,
+ }
+
+ builder.adds[parentID] = append(builder.adds[parentID], nf)
+ return nil
+}
+
+func (op *addColumnOp) String() string {
+ return fmt.Sprintf("AddColumn(%s, required=%t, type=%s)",
+ strings.Join(op.path, "."), op.required, op.dataType.String())
+}
+
+/// UpdateColumn Operation
+
+// Implementation for updateColumnOp
+func (op *updateColumnOp) Validate(schema *iceberg.Schema, settings
*validationSettings) error {
+ field := findField(schema, op.path, settings.caseSensitive)
+ if field == nil {
+ return fmt.Errorf("cannot update missing column: %s",
strings.Join(op.path, "."))
+ }
+
+ // Type promotion validation
+ if op.updates.Type.Valid && !field.Type.Equals(op.updates.Type.Val) {
+ _, err := iceberg.PromoteType(field.Type, op.updates.Type.Val)
+ if err != nil {
+ return fmt.Errorf("cannot update type of column: %s: %s
-> %s: %w",
+ strings.Join(op.path, "."),
field.Type.String(), op.updates.Type.Val.String(), err)
+ }
+ }
+
+ // Required flag validation
+ if op.updates.Required.Valid && field.Required !=
op.updates.Required.Val {
+ isRequired := op.updates.Required.Val
+ if isRequired && !settings.allowIncompatibleChanges {
+ return fmt.Errorf("cannot change column nullability:
%s: optional -> required",
+ strings.Join(op.path, "."))
+ }
+ }
+
+ return nil
+}
+
+func (op *updateColumnOp) Apply(builder *schemaBuilder) error {
+ field := findField(builder.baseSchema, op.path,
builder.settings.caseSensitive)
+ if field == nil {
+ return fmt.Errorf("cannot update missing column: %s",
strings.Join(op.path, "."))
+ }
+
+ // Create a copy of the field for modification
+ updatedField := *field
+ hasChanges := false
+
+ // Update type if provided
+ if op.updates.Type.Valid && !field.Type.Equals(op.updates.Type.Val) {
+ newType, err := iceberg.PromoteType(field.Type,
op.updates.Type.Val)
+ if err != nil {
+ return fmt.Errorf("cannot update type of column: %s: %s
-> %s: %w",
+ strings.Join(op.path, "."),
field.Type.String(), op.updates.Type.Val.String(), err)
+ }
+ updatedField.Type = newType
+ hasChanges = true
+ }
+
+ // Update documentation if provided
+ if op.updates.Doc.Valid && field.Doc != op.updates.Doc.Val {
+ updatedField.Doc = op.updates.Doc.Val
+ hasChanges = true
+ }
+
+ // Update default value if provided
+ if op.updates.Default != nil && field.InitialDefault !=
op.updates.Default {
+ updatedField.InitialDefault = op.updates.Default
+ hasChanges = true
+ }
+
+ // Update required flag if provided
+ if op.updates.Required.Valid && field.Required !=
op.updates.Required.Val {
+ updatedField.Required = op.updates.Required.Val
+ hasChanges = true
+ }
+
+ // Only update the map if changes were made
+ if hasChanges {
+ builder.updates[field.ID] = &updatedField
+ }
+
+ return nil
+}
+
+func (op *updateColumnOp) String() string {
+ var changes []string
+ if op.updates.Type.Valid {
+ changes = append(changes, fmt.Sprintf("type=%s",
op.updates.Type.Val.String()))
+ }
+ if op.updates.Doc.Valid {
+ changes = append(changes, fmt.Sprintf("doc=%s",
op.updates.Doc.Val))
+ }
+ if op.updates.Default != nil {
+ changes = append(changes, "default=<value>")
+ }
+ if op.updates.Required.Valid {
+ changes = append(changes, fmt.Sprintf("required=%t",
op.updates.Required.Val))
+ }
+ return fmt.Sprintf("UpdateColumn(%s, %s)",
+ strings.Join(op.path, "."), strings.Join(changes, ", "))
+}
+
+/// DeleteColumn Operation
+
+// Implementation for deleteColumnOp
+func (op *deleteColumnOp) Validate(schema *iceberg.Schema, settings
*validationSettings) error {
+ field := findField(schema, op.path, settings.caseSensitive)
+ if field == nil {
+ return fmt.Errorf("cannot delete missing column: %s",
strings.Join(op.path, "."))
+ }
+ return nil
+}
+
+func (op *deleteColumnOp) Apply(builder *schemaBuilder) error {
+ field := findField(builder.baseSchema, op.path,
builder.settings.caseSensitive)
+ if field == nil {
+ return fmt.Errorf("cannot delete missing column: %s",
strings.Join(op.path, "."))
+ }
+
+ // Check for conflicts with adds/updates
+ if _, ok := builder.adds[field.ID]; ok {
+ return fmt.Errorf("cannot delete a column that has additions:
%s", strings.Join(op.path, "."))
+ }
+
+ if _, ok := builder.updates[field.ID]; ok {
+ return fmt.Errorf("cannot delete a column that has updates:
%s", strings.Join(op.path, "."))
+ }
+
+ builder.deletes[field.ID] = struct{}{}
+ return nil
+}
+
+func (op *deleteColumnOp) String() string {
+ return fmt.Sprintf("DeleteColumn(%s)", strings.Join(op.path, "."))
+}
+
+/// MoveColumn Operation
+
+// Implementation for moveColumnOp
+func (op *moveColumnOp) Validate(schema *iceberg.Schema, settings
*validationSettings) error {
+ // Validate column to move exists
+ colField := findField(schema, op.columnToMove, settings.caseSensitive)
+ if colField == nil {
+ return fmt.Errorf("cannot move missing column: %s",
strings.Join(op.columnToMove, "."))
+ }
+
+ // Validate reference column for before/after operations
+ if op.op == OpBefore || op.op == OpAfter {
+ other := findField(schema, op.referenceColumn,
settings.caseSensitive)
+ if other == nil {
+ return fmt.Errorf("reference column for move not found:
%s", strings.Join(op.referenceColumn, "."))
+ }
+
+ // Check same parent
+ colParentID := parentIDForPath(schema, op.columnToMove,
settings.caseSensitive)
+ refParentID := parentIDForPath(schema, op.referenceColumn,
settings.caseSensitive)
+ if colParentID != refParentID {
+ return errors.New("cannot move column across different
parent structs")
+ }
+
+ if other.ID == colField.ID {
+ return errors.New("cannot move column relative to
itself")
+ }
+ }
+
+ return nil
+}
+
+func (op *moveColumnOp) Apply(builder *schemaBuilder) error {
+ colField := findField(builder.baseSchema, op.columnToMove,
builder.settings.caseSensitive)
+ if colField == nil {
+ return fmt.Errorf("cannot move missing column: %s",
strings.Join(op.columnToMove, "."))
+ }
+
+ parentID := parentIDForPath(builder.baseSchema, op.columnToMove,
builder.settings.caseSensitive)
+
+ var otherID int
+ if op.op == OpBefore || op.op == OpAfter {
+ other := findField(builder.baseSchema, op.referenceColumn,
builder.settings.caseSensitive)
+ if other == nil {
+ return fmt.Errorf("reference column for move not found:
%s", strings.Join(op.referenceColumn, "."))
+ }
+ otherID = other.ID
+ }
+
+ builder.moves[parentID] = append(builder.moves[parentID], moveReq{
+ fieldID: colField.ID,
+ otherFieldID: otherID,
+ op: op.op,
+ })
+
+ return nil
+}
+
+func (op *moveColumnOp) String() string {
+ if op.op == OpFirst {
+ return fmt.Sprintf("Move(%s, %s)",
strings.Join(op.columnToMove, "."), op.op)
+ }
+ return fmt.Sprintf("Move(%s, %s %s)",
+ strings.Join(op.columnToMove, "."), op.op,
strings.Join(op.referenceColumn, "."))
+}
+
+// Helper methods for schemaBuilder
+func (b *schemaBuilder) assignNewColumnID() int {
+ next := b.lastColumnID + 1
+ b.lastColumnID = next
+ return next
+}
+
+// Build the final schema using existing logic
+func (us *UpdateSchema) buildFinalSchema(builder *schemaBuilder)
(*iceberg.Schema, error) {
+ newFields, err := rebuild(us.schema.AsStruct().FieldList, -1, builder)
+ if err != nil {
+ return nil, err
+ }
+
+ idList := us.schema.IdentifierFieldIDs
+ newID := us.schema.ID + 1
+
+ return iceberg.NewSchemaWithIdentifiers(newID, idList, newFields...),
nil
+}
+
+// Rebuild schema with builder state
+func rebuild(fields []iceberg.NestedField, parentID int, builder
*schemaBuilder) ([]iceberg.NestedField, error) {
+ var out []iceberg.NestedField
+
+ // iterate over the current fields to apply updates and deletes
+ for _, f := range fields {
+ if _, gone := builder.deletes[f.ID]; gone {
+ continue
+ }
+ if upd, ok := builder.updates[f.ID]; ok {
+ f = *upd
+ }
+
+ switch t := f.Type.(type) {
+ case *iceberg.StructType:
+ fields, err := rebuild(t.Fields(), f.ID, builder)
+ if err != nil {
+ return nil, fmt.Errorf("error rebuilding struct
type: %w", err)
+ }
+ f.Type = &iceberg.StructType{FieldList: fields}
+
+ case *iceberg.ListType:
+ el := t.ElementField()
+ fields, err := rebuild([]iceberg.NestedField{el},
el.ID, builder)
+ if err != nil {
+ return nil, fmt.Errorf("error rebuilding list
type: %w", err)
+ }
+ el = fields[0]
+ f.Type = &iceberg.ListType{
+ ElementID: el.ID,
+ Element: el.Type,
+ ElementRequired: el.Required,
+ }
+
+ case *iceberg.MapType:
+ val := t.ValueField()
+ fields, err := rebuild([]iceberg.NestedField{val},
val.ID, builder)
+ if err != nil {
+ return nil, fmt.Errorf("error rebuilding map
type: %w", err)
+ }
+ val = fields[0]
+ f.Type = &iceberg.MapType{
+ KeyID: t.KeyField().ID,
+ KeyType: t.KeyField().Type,
+ ValueID: val.ID,
+ ValueType: val.Type,
+ ValueRequired: val.Required,
+ }
+ }
+ out = append(out, f)
+ }
+
+ // append new children for this parent id (-1 means root)
+ for _, nf := range builder.adds[parentID] {
+ out = append(out, *nf)
+ }
+
+ // check if there are any moves for this parent id (-1 means root)
+ if reqs := builder.moves[parentID]; len(reqs) > 0 {
+ var err error
+ out, err = reorder(out, reqs)
+ if err != nil {
+ return nil, fmt.Errorf("error reordering fields: %w",
err)
+ }
+ }
+
+ return out, nil
+}
+
+// Reorder fields based on move operations
+func reorder(fields []iceberg.NestedField, reqs []moveReq)
([]iceberg.NestedField, error) {
+ // find the index of a field by its id
+ indexOf := func(id int) int {
+ for i, f := range fields {
+ if f.ID == id {
+ return i
+ }
+ }
+ return -1
+ }
+
+ for _, m := range reqs {
+ pos := indexOf(m.fieldID)
+ if pos == -1 {
+ continue // field might have been deleted
+ }
+
+ f := fields[pos]
+ fields = append(fields[:pos], fields[pos+1:]...)
+
+ switch m.op {
+ case OpFirst:
+ fields = append([]iceberg.NestedField{f}, fields...)
+ case OpBefore:
+ idx := indexOf(m.otherFieldID)
+ if idx == -1 {
+ return nil, errors.New("move-before target not
found at commit time")
+ }
+ fields = append(fields[:idx],
+ append([]iceberg.NestedField{f},
fields[idx:]...)...)
+ case OpAfter:
+ idx := indexOf(m.otherFieldID)
+ if idx == -1 {
+ return nil, errors.New("move-after target not
found at commit time")
+ }
+ fields = append(fields[:idx+1],
+ append([]iceberg.NestedField{f},
fields[idx+1:]...)...)
+ default:
+ return nil, fmt.Errorf("unknown move op: %s", m.op)
+ }
+ }
+
+ return fields, nil
+}
+
+// Helper functions
+func findField(schema *iceberg.Schema, path []string, caseSensitive bool)
*iceberg.NestedField {
+ name := strings.Join(path, ".")
+
+ var field iceberg.NestedField
+ var ok bool
+
+ if caseSensitive {
+ field, ok = schema.FindFieldByName(name)
+ } else {
+ field, ok = schema.FindFieldByNameCaseInsensitive(name)
+ }
+
+ if !ok {
+ return nil
+ }
+
+ return &field
+}
+
+func parentIDForPath(schema *iceberg.Schema, path []string, caseSensitive
bool) int {
+ if len(path) == 1 {
+ return -1
+ }
+
+ if f := findField(schema, path[:len(path)-1], caseSensitive); f != nil {
+ return f.ID
+ }
+
+ return -1
+}
+
+func validateDefaultValue(typ iceberg.Type, val any) error {
+ // Defaults are only allowed on primitive columns
+ prim, ok := typ.(iceberg.PrimitiveType)
+ if !ok {
+ return fmt.Errorf("defaults are only allowed on primitive
columns, got %s", typ.Type())
+ }
+
+ lit, err := iceberg.LiteralFromAny(val)
+ if err != nil {
+ return err
+ }
+
+ litType := lit.Type()
+
+ // Exact match ?
+ if litType.Equals(prim) {
+ return nil
+ }
+
+ return fmt.Errorf("default literal of type %s is not assignable to of
type %s", litType.String(), prim.Type())
+}
+
+func (us *UpdateSchema) findExistingSchemaInTransaction(newSchema
*iceberg.Schema) *int {
+ for _, schema := range us.txn.tbl.metadata.Schemas() {
+ if newSchema.Equals(schema) {
+ return &schema.ID
+ }
+ }
+ return nil
+}
+
+func (us *UpdateSchema) getNameMappingUpdates(newSchema *iceberg.Schema)
[]Update {
+ // Only update name mapping if we have adds/updates that might need it
+ hasAdds := false
+ hasUpdates := false
Review Comment:
we're not doing different behaviors based on `hasAdds` vs `hasUpdates` so
just use one variable
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
Review Comment:
I agree here, I don't think there's any good use for the `Reset` or
`RemoveLastOperation` methods.
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+ if len(us.operations) > 0 {
+ us.operations = us.operations[:len(us.operations)-1]
+ }
+ return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+ result := make([]string, len(us.operations))
+ for i, op := range us.operations {
+ result[i] = op.String()
+ }
+ return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+ // Clear the previous errors
+ // because if validation is called multiple times, the errors will
accumulate even after removing operations causing error
+ us.errors = us.errors[:0]
+
+ settings := &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ }
Review Comment:
why use a pointer for this? just take `validationSettings` instead of a
`*validationSettings`
##########
table/table_test.go:
##########
@@ -1420,5 +1657,4 @@ func (t *TableWritingTestSuite)
TestDeleteOldMetadataNoErrorLogsOnFileFound() {
// validate that no error is logged
logOutput := logBuf.String()
t.NotContains(logOutput, "Warning: Failed to delete old metadata file")
- t.NotContains(logOutput, "no such file or directory")
Review Comment:
why are we removing this?
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+ if len(us.operations) > 0 {
+ us.operations = us.operations[:len(us.operations)-1]
+ }
+ return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+ result := make([]string, len(us.operations))
+ for i, op := range us.operations {
+ result[i] = op.String()
+ }
+ return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+ // Clear the previous errors
+ // because if validation is called multiple times, the errors will
accumulate even after removing operations causing error
+ us.errors = us.errors[:0]
+
+ settings := &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ }
+
+ for _, op := range us.operations {
+ if err := op.Validate(us.schema, settings); err != nil {
+ us.errors = append(us.errors, err)
+ }
+ }
+
+ if len(us.errors) > 0 {
+ return fmt.Errorf("validation failed with %d errors: %v",
len(us.errors), us.errors)
+ }
+ return nil
+}
+
+// Build validates and constructs the final schema
+func (us *UpdateSchema) Build() (*iceberg.Schema, error) {
+ // First validate all operations
+ if err := us.Validate(); err != nil {
+ return nil, err
+ }
+
+ // No operations means no changes
+ if len(us.operations) == 0 {
+ return us.schema, nil
+ }
Review Comment:
should we do this *before* calling validate?
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+ if len(us.operations) > 0 {
+ us.operations = us.operations[:len(us.operations)-1]
+ }
+ return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+ result := make([]string, len(us.operations))
+ for i, op := range us.operations {
+ result[i] = op.String()
+ }
+ return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+ // Clear the previous errors
+ // because if validation is called multiple times, the errors will
accumulate even after removing operations causing error
+ us.errors = us.errors[:0]
+
+ settings := &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ }
+
+ for _, op := range us.operations {
+ if err := op.Validate(us.schema, settings); err != nil {
+ us.errors = append(us.errors, err)
+ }
+ }
+
+ if len(us.errors) > 0 {
+ return fmt.Errorf("validation failed with %d errors: %v",
len(us.errors), us.errors)
+ }
+ return nil
+}
+
+// Build validates and constructs the final schema
+func (us *UpdateSchema) Build() (*iceberg.Schema, error) {
+ // First validate all operations
+ if err := us.Validate(); err != nil {
+ return nil, err
+ }
+
+ // No operations means no changes
+ if len(us.operations) == 0 {
+ return us.schema, nil
+ }
+
+ // Create builder and apply operations
+ builder := &schemaBuilder{
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]*iceberg.NestedField),
+ adds: make(map[int][]*iceberg.NestedField),
+ moves: make(map[int][]moveReq),
+ lastColumnID: us.lastColumnID,
+ baseSchema: us.schema,
+ settings: &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ },
+ }
+
+ // Apply operations in order
+ for _, op := range us.operations {
+ if err := op.Apply(builder); err != nil {
+ return nil, fmt.Errorf("failed to apply operation %s:
%w", op.String(), err)
+ }
+ }
+
+ // Build final schema
+ return us.buildFinalSchema(builder)
+}
+
+// Commit validates, builds, and commits the schema changes
+func (us *UpdateSchema) Commit() error {
+ updates, requirements, err := us.CommitUpdates()
+ if err != nil {
+ return err
+ }
+
+ if len(updates) == 0 {
+ return nil
+ }
+
+ return us.txn.apply(updates, requirements)
+}
+
+// CommitUpdates returns the updates and requirements needed
+func (us *UpdateSchema) CommitUpdates() ([]Update, []Requirement, error) {
+ // If there are no operations, return nil updates and requirements
+ if len(us.operations) == 0 {
+ return nil, nil, nil
+ }
+
+ newSchema, err := us.Build()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Check if equivalent schema exists
+ existingSchemaID := us.findExistingSchemaInTransaction(newSchema)
+
+ var updates []Update
+ var requirements []Requirement
+
+ // for Commit Contention
+ requirements = append(requirements, AssertCurrentSchemaID(us.schema.ID))
+
+ if existingSchemaID == nil {
+ // Get the final lastColumnID from the builder
+ builder := &schemaBuilder{
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]*iceberg.NestedField),
+ adds: make(map[int][]*iceberg.NestedField),
+ moves: make(map[int][]moveReq),
+ lastColumnID: us.lastColumnID,
+ baseSchema: us.schema,
+ settings: &validationSettings{
+ allowIncompatibleChanges:
us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ },
+ }
+
+ for _, op := range us.operations {
+ if err := op.Apply(builder); err != nil {
+ return nil, nil, fmt.Errorf("failed to
recompute lastColumnID: %w", err)
+ }
+ }
+ updates = append(updates,
+ NewAddSchemaUpdate(newSchema, builder.lastColumnID,
false),
+ NewSetCurrentSchemaUpdate(newSchema.ID),
+ )
Review Comment:
This seems like a waste to have to *rebuild* the schema again. Can we
somehow store/update the `lastColumnID` after calling `Build()` rather than
redo the build here.
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
Review Comment:
since these operate only on internal types, why are we bothering to export
this type?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]