xixipi-lining commented on code in PR #431:
URL: https://github.com/apache/iceberg-go/pull/431#discussion_r2238276172
##########
table/update_schema.go:
##########
@@ -0,0 +1,834 @@
+package table
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSchema accumulates operations and validates on Build/Commit
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ operations []SchemaOperation
+ errors []error // Collect validation errors
+
+ allowIncompatibleChanges bool
+ identifierFields map[string]struct{}
+ caseSensitive bool
+}
+
+// Operation represents a deferred schema operation
+type SchemaOperation interface {
+ Apply(builder *schemaBuilder) error
+ Validate(schema *iceberg.Schema, settings *validationSettings) error
+ String() string
+}
+
+// Validation settings passed to operations
+type validationSettings struct {
+ allowIncompatibleChanges bool
+ caseSensitive bool
+}
+
+type schemaBuilder struct {
+ deletes map[int]struct{}
+ updates map[int]*iceberg.NestedField
+ adds map[int][]*iceberg.NestedField
+ moves map[int][]moveReq
+ lastColumnID int
+ baseSchema *iceberg.Schema
+ settings *validationSettings
+}
+
+// Operation Types
+// Each schema update operation is represented by a struct that implements the
SchemaOperation interface
+// and has a Validate and Apply method.
+// The Validate method validates the operation and returns an error if the
operation is invalid.
+// The Apply method applies the operation to the schema builder.
+type addColumnOp struct {
+ path []string
+ required bool
+ dataType iceberg.Type
+ doc string
+ initialDefault any
+}
+
+type updateColumnOp struct {
+ path []string
+ updates ColumnUpdate
+}
+
+type deleteColumnOp struct {
+ path []string
+}
+
+type moveColumnOp struct {
+ columnToMove []string
+ referenceColumn []string
+ op moveOp
+}
+
+type ColumnUpdate struct {
+ Type iceberg.Optional[iceberg.Type] // nil means no change
+ Doc iceberg.Optional[string] // nil means no change
+ Default any // nil means no change
+ Required iceberg.Optional[bool] // nil means no change
+}
+
+// Move operation
+type moveOp string
+
+const (
+ OpFirst moveOp = "first"
+ OpBefore moveOp = "before"
+ OpAfter moveOp = "after"
+)
+
+type moveReq struct {
+ fieldID int
+ otherFieldID int // 0 when opFirst
+ op moveOp
+}
+
+func NewUpdateSchema(txn *Transaction, s *iceberg.Schema, lastColumnID int)
*UpdateSchema {
+ return &UpdateSchema{
+ txn: txn,
+ schema: s,
+ lastColumnID: lastColumnID,
+ operations: make([]SchemaOperation, 0),
+ errors: make([]error, 0),
+ allowIncompatibleChanges: false,
+ identifierFields: make(map[string]struct{}),
+ caseSensitive: true,
+ }
+}
+
+// AllowIncompatibleChanges permits incompatible schema changes
+func (us *UpdateSchema) AllowIncompatibleChanges() *UpdateSchema {
+ us.allowIncompatibleChanges = true
+ return us
+}
+
+// SetCaseSensitive controls case sensitivity for field lookups
+func (us *UpdateSchema) SetCaseSensitive(caseSensitive bool) *UpdateSchema {
+ us.caseSensitive = caseSensitive
+ return us
+}
+
+// AddColumn queues an add column operation - validation deferred until Build
is called
+func (us *UpdateSchema) AddColumn(path []string, required bool, dataType
iceberg.Type, doc string, initialDefault any) *UpdateSchema {
+ op := &addColumnOp{
+ path: path,
+ required: required,
+ dataType: dataType,
+ doc: doc,
+ initialDefault: initialDefault,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// UpdateColumn queues an update column operation - validation deferred until
Build is called
+func (us *UpdateSchema) UpdateColumn(path []string, updates ColumnUpdate)
*UpdateSchema {
+ op := &updateColumnOp{
+ path: path,
+ updates: updates,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// DeleteColumn queues a delete column operation - validation deferred until
Build is called
+func (us *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ op := &deleteColumnOp{
+ path: path,
+ }
+ us.operations = append(us.operations, op)
+ return us
+}
+
+// Move queues a move column operation - validation deferred until Build is
called
+func (us *UpdateSchema) Move(columnToMove, referenceColumn []string, op
moveOp) *UpdateSchema {
+ moveOp := &moveColumnOp{
+ columnToMove: columnToMove,
+ referenceColumn: referenceColumn,
+ op: op,
+ }
+ us.operations = append(us.operations, moveOp)
+ return us
+}
+
+/// Operation Methods
+
+// Reset clears all queued operations and errors - validation deferred until
Build is called
+func (us *UpdateSchema) Reset() *UpdateSchema {
+ us.operations = us.operations[:0]
+ us.errors = us.errors[:0]
+ return us
+}
+
+// RemoveLastOperation removes the most recently added operation
+func (us *UpdateSchema) RemoveLastOperation() *UpdateSchema {
+ if len(us.operations) > 0 {
+ us.operations = us.operations[:len(us.operations)-1]
+ }
+ return us
+}
+
+// GetQueuedOperations returns a copy of the queued operations
+func (us *UpdateSchema) GetQueuedOperations() []string {
+ result := make([]string, len(us.operations))
+ for i, op := range us.operations {
+ result[i] = op.String()
+ }
+ return result
+}
+
+// Validate runs all deferred validations for operations without building
+// basically just checks if the operations are valid
+func (us *UpdateSchema) Validate() error {
+ // Clear the previous errors
+ // because if validation is called multiple times, the errors will
accumulate even after removing operations causing error
+ us.errors = us.errors[:0]
+
+ settings := &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ }
+
+ for _, op := range us.operations {
+ if err := op.Validate(us.schema, settings); err != nil {
+ us.errors = append(us.errors, err)
+ }
+ }
+
+ if len(us.errors) > 0 {
+ return fmt.Errorf("validation failed with %d errors: %v",
len(us.errors), us.errors)
+ }
+ return nil
+}
+
+// Build validates and constructs the final schema
+func (us *UpdateSchema) Build() (*iceberg.Schema, error) {
+ // First validate all operations
+ if err := us.Validate(); err != nil {
+ return nil, err
+ }
+
+ // No operations means no changes
+ if len(us.operations) == 0 {
+ return us.schema, nil
+ }
+
+ // Create builder and apply operations
+ builder := &schemaBuilder{
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]*iceberg.NestedField),
+ adds: make(map[int][]*iceberg.NestedField),
+ moves: make(map[int][]moveReq),
+ lastColumnID: us.lastColumnID,
+ baseSchema: us.schema,
+ settings: &validationSettings{
+ allowIncompatibleChanges: us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ },
+ }
+
+ // Apply operations in order
+ for _, op := range us.operations {
+ if err := op.Apply(builder); err != nil {
+ return nil, fmt.Errorf("failed to apply operation %s:
%w", op.String(), err)
+ }
+ }
+
+ // Build final schema
+ return us.buildFinalSchema(builder)
+}
+
+// Commit validates, builds, and commits the schema changes
+func (us *UpdateSchema) Commit() error {
+ updates, requirements, err := us.CommitUpdates()
+ if err != nil {
+ return err
+ }
+
+ if len(updates) == 0 {
+ return nil
+ }
+
+ return us.txn.apply(updates, requirements)
+}
+
+// CommitUpdates returns the updates and requirements needed
+func (us *UpdateSchema) CommitUpdates() ([]Update, []Requirement, error) {
+ // If there are no operations, return nil updates and requirements
+ if len(us.operations) == 0 {
+ return nil, nil, nil
+ }
+
+ newSchema, err := us.Build()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Check if equivalent schema exists
+ existingSchemaID := us.findExistingSchemaInTransaction(newSchema)
+
+ var updates []Update
+ var requirements []Requirement
+
+ // for Commit Contention
+ requirements = append(requirements, AssertCurrentSchemaID(us.schema.ID))
+
+ if existingSchemaID == nil {
+ // Get the final lastColumnID from the builder
+ builder := &schemaBuilder{
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]*iceberg.NestedField),
+ adds: make(map[int][]*iceberg.NestedField),
+ moves: make(map[int][]moveReq),
+ lastColumnID: us.lastColumnID,
+ baseSchema: us.schema,
+ settings: &validationSettings{
+ allowIncompatibleChanges:
us.allowIncompatibleChanges,
+ caseSensitive: us.caseSensitive,
+ },
+ }
+
+ for _, op := range us.operations {
+ if err := op.Apply(builder); err != nil {
+ return nil, nil, fmt.Errorf("failed to
recompute lastColumnID: %w", err)
+ }
+ }
+ updates = append(updates,
+ NewAddSchemaUpdate(newSchema, builder.lastColumnID,
false),
+ NewSetCurrentSchemaUpdate(newSchema.ID),
+ )
+ } else {
+ updates = append(updates,
NewSetCurrentSchemaUpdate(*existingSchemaID))
+ }
+
+ if nameMapUpdates := us.getNameMappingUpdates(newSchema);
len(nameMapUpdates) > 0 {
+ updates = append(updates, nameMapUpdates...)
+ }
+
+ return updates, requirements, nil
+}
+
+/// AddColumn Operation
+
+// Implementation of Operation interface for addColumnOp
+func (op *addColumnOp) Validate(schema *iceberg.Schema, settings
*validationSettings) error {
+ if len(op.path) == 0 {
+ return errors.New("AddColumn: path must contain at least the
new column name")
+ }
+
+ // Check if field already exists
+ if existing := findField(schema, op.path, settings.caseSensitive);
existing != nil {
+ return fmt.Errorf("cannot add column; name already exists: %s",
strings.Join(op.path, "."))
+ }
+
+ // Validate parent exists and is a nested type
+ if len(op.path) > 1 {
+ parentPath := op.path[:len(op.path)-1]
+ pf := findField(schema, parentPath, settings.caseSensitive)
+ if pf == nil {
+ return fmt.Errorf("cannot find parent struct: %s",
strings.Join(parentPath, "."))
+ }
+
+ switch pf.Type.(type) {
+ case *iceberg.StructType, *iceberg.MapType, *iceberg.ListType:
Review Comment:
If the parent field found in that way is a MapType, we need to check whether
its value type is a structure. Likewise, for ListType, we need to check if the
element type is a structure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]