zeroshade commented on code in PR #467: URL: https://github.com/apache/iceberg-go/pull/467#discussion_r2178445663
########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { Review Comment: The whole benefit to returning `*UpdateSpec` is so that you can chain these calls. If we're also returning the error then we, by definition, are preventing the chaining unless someone creates a `must` function which seems bad. So either we should change the signatures to just return only an `error` instead of `*UpdateSpec` or we should change this to be more of a Builder pattern where the actual function calls just populate the list of operations (AddField, RemoveField, Rename, etc.) but don't actually attempt to *perform* the functions until something like `UpdateSpec.Build()` is called, at which point it returns the error. Basically, I'm proposing we should modify this to one of these two patterns: 1. Change all the methods to return *only* `error` instead of `(*UpdateSpec, error)` 2. Change the structure of the `UpdateSpec` struct to facilitate something like: `err := (&UpdateSpec{}).AddField(...).RemoveField(...).RenameField(...).Build()`. I think both of those would be cleaner than this current function signature. ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { + partitionFields := make([]iceberg.PartitionField, 0) + partitionNames := make(map[string]bool) + spec := us.txn.tbl.Metadata().PartitionSpec() + for field := range spec.Fields() { + var newField iceberg.PartitionField + var err error + if _, deleted := us.deletes[field.FieldID]; !deleted { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, field.Transform, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } else if us.txn.tbl.Metadata().Version() == 1 { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, iceberg.VoidTransform{}, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } + } + + for _, field := range us.adds { + newField := iceberg.PartitionField{ + SourceID: field.SourceID, + FieldID: field.FieldID, + Name: field.Name, + Transform: field.Transform, + } + partitionFields = append(partitionFields, newField) + } + + newSpec := iceberg.NewPartitionSpec(partitionFields...) + newSpecId := iceberg.InitialPartitionSpecID + for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() { + if newSpec.CompatibleWith(&spec) { + newSpecId = spec.ID() + + break + } else if newSpecId <= spec.ID() { + newSpecId = spec.ID() + 1 + } + } + newSpec = iceberg.NewPartitionSpecID(newSpecId, partitionFields...) + + return &newSpec +} + +func (us *UpdateSpec) Commit() error { + updates, requirements, err := us.CommitUpdates() + if err != nil { + return err + } + + if len(updates) == 0 { + return nil + } + + return us.txn.apply(updates, requirements) +} + +func (us *UpdateSpec) CommitUpdates() ([]Update, []Requirement, error) { + newSpec := us.Apply() + updates := make([]Update, 0) + requirements := make([]Requirement, 0) + + if us.txn.tbl.Metadata().DefaultPartitionSpec() != newSpec.ID() { + if us.isNewPartitionSpec(newSpec.ID()) { + updates = append(updates, NewAddPartitionSpecUpdate(newSpec, false)) + updates = append(updates, NewSetDefaultSpecUpdate(-1)) + } else { + updates = append(updates, NewSetDefaultSpecUpdate(newSpec.ID())) + } + requiredLastAssignedPartitionId := us.txn.tbl.Metadata().LastPartitionSpecID() + requirements = append(requirements, AssertLastAssignedPartitionID(*requiredLastAssignedPartitionId)) + } + + return updates, requirements, nil +} + +func (us *UpdateSpec) partitionField(key transformKey, name string) (iceberg.PartitionField, error) { + if us.txn.tbl.Metadata().Version() == 2 { + sourceId, transform := key.SourceId, key.Transform + historicalFields := make([]iceberg.PartitionField, 0) + for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() { + for field := range spec.Fields() { + historicalFields = append(historicalFields, field) + } Review Comment: ```suggestion historicalFields = slices.AppendSeq(historicalFields, spec.Fields()) ``` ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { + partitionFields := make([]iceberg.PartitionField, 0) + partitionNames := make(map[string]bool) + spec := us.txn.tbl.Metadata().PartitionSpec() + for field := range spec.Fields() { + var newField iceberg.PartitionField + var err error + if _, deleted := us.deletes[field.FieldID]; !deleted { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, field.Transform, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } else if us.txn.tbl.Metadata().Version() == 1 { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, iceberg.VoidTransform{}, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } + } + + for _, field := range us.adds { + newField := iceberg.PartitionField{ + SourceID: field.SourceID, + FieldID: field.FieldID, + Name: field.Name, + Transform: field.Transform, + } + partitionFields = append(partitionFields, newField) + } + + newSpec := iceberg.NewPartitionSpec(partitionFields...) + newSpecId := iceberg.InitialPartitionSpecID + for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() { + if newSpec.CompatibleWith(&spec) { + newSpecId = spec.ID() + + break + } else if newSpecId <= spec.ID() { + newSpecId = spec.ID() + 1 + } + } + newSpec = iceberg.NewPartitionSpecID(newSpecId, partitionFields...) + + return &newSpec +} + +func (us *UpdateSpec) Commit() error { + updates, requirements, err := us.CommitUpdates() + if err != nil { + return err + } + + if len(updates) == 0 { + return nil + } + + return us.txn.apply(updates, requirements) +} + +func (us *UpdateSpec) CommitUpdates() ([]Update, []Requirement, error) { + newSpec := us.Apply() + updates := make([]Update, 0) + requirements := make([]Requirement, 0) + + if us.txn.tbl.Metadata().DefaultPartitionSpec() != newSpec.ID() { + if us.isNewPartitionSpec(newSpec.ID()) { + updates = append(updates, NewAddPartitionSpecUpdate(newSpec, false)) + updates = append(updates, NewSetDefaultSpecUpdate(-1)) + } else { + updates = append(updates, NewSetDefaultSpecUpdate(newSpec.ID())) + } + requiredLastAssignedPartitionId := us.txn.tbl.Metadata().LastPartitionSpecID() + requirements = append(requirements, AssertLastAssignedPartitionID(*requiredLastAssignedPartitionId)) + } + + return updates, requirements, nil +} + +func (us *UpdateSpec) partitionField(key transformKey, name string) (iceberg.PartitionField, error) { + if us.txn.tbl.Metadata().Version() == 2 { + sourceId, transform := key.SourceId, key.Transform + historicalFields := make([]iceberg.PartitionField, 0) + for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() { + for field := range spec.Fields() { + historicalFields = append(historicalFields, field) + } + } + for _, fields := range historicalFields { + if fields.SourceID == sourceId && fields.Transform.String() == transform { + return iceberg.PartitionField{ + SourceID: sourceId, + FieldID: fields.FieldID, + Name: name, + Transform: fields.Transform, + }, nil + } + } + } + newFieldId := us.newFieldId() + transform, _ := iceberg.ParseTransform(key.Transform) + if name == "" { + tmp_field := iceberg.PartitionField{ + SourceID: key.SourceId, + FieldID: newFieldId, + Name: "", + Transform: transform, + } + var err error + name, err = iceberg.GeneratePartitionFieldName(us.txn.tbl.Schema(), tmp_field) + if err != nil { + return iceberg.PartitionField{}, err + } + } + + return iceberg.PartitionField{ + SourceID: key.SourceId, + FieldID: newFieldId, + Name: name, + Transform: transform, + }, nil +} + +func (us *UpdateSpec) newFieldId() int { + us.lastAssignedFieldId += 1 + + return us.lastAssignedFieldId +} + +func (us *UpdateSpec) isDuplicatePartition(transform iceberg.Transform, partitionField iceberg.PartitionField) bool { + _, exists := us.deletes[partitionField.FieldID] + + return !exists && transform == partitionField.Transform +} + +func (us *UpdateSpec) checkAndAddPartitionName(schema *iceberg.Schema, name string, sourceId int, partitionNames map[string]bool) error { + field, found := schema.FindFieldByName(name) + if found && field.ID != sourceId { + return fmt.Errorf("cannot create partition from name that exists in schema %s", name) + } + if _, exists := partitionNames[name]; exists { + return fmt.Errorf("partition name has to be unique: %s", name) + } + partitionNames[name] = true + + return nil +} + +func (us *UpdateSpec) addNewField(schema *iceberg.Schema, sourceId int, fieldId int, name string, transform iceberg.Transform, partitionNames map[string]bool) (iceberg.PartitionField, error) { + err := us.checkAndAddPartitionName(schema, name, sourceId, partitionNames) + if err != nil { + return iceberg.PartitionField{}, err + } + + return iceberg.PartitionField{ + SourceID: sourceId, + FieldID: fieldId, + Name: name, + Transform: transform, + }, nil +} + +func (us *UpdateSpec) isNewPartitionSpec(newSpecId int) bool { + for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() { + if spec.ID() == newSpecId { + return false + } + } + + return true Review Comment: maybe ```go return !slices.ContainsFunc(us.txn.tbl.Metadata().PartitionSpecs(), func(s iceberg.PartitionSpec) bool { return s.ID() == newSpecId }) ``` ########## partitions.go: ########## @@ -28,7 +28,7 @@ import ( ) const ( - partitionDataIDStart = 1000 + PartitionDataIDStart = 1000 InitialPartitionSpecID = 0 ) Review Comment: Is there a particular reason we need to expose this constant? ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) Review Comment: same comment as above, no need to explicitly call `.String()` ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { + partitionFields := make([]iceberg.PartitionField, 0) + partitionNames := make(map[string]bool) + spec := us.txn.tbl.Metadata().PartitionSpec() + for field := range spec.Fields() { + var newField iceberg.PartitionField + var err error + if _, deleted := us.deletes[field.FieldID]; !deleted { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, field.Transform, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } else if us.txn.tbl.Metadata().Version() == 1 { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, iceberg.VoidTransform{}, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } + } + + for _, field := range us.adds { + newField := iceberg.PartitionField{ + SourceID: field.SourceID, + FieldID: field.FieldID, + Name: field.Name, + Transform: field.Transform, + } + partitionFields = append(partitionFields, newField) + } + + newSpec := iceberg.NewPartitionSpec(partitionFields...) + newSpecId := iceberg.InitialPartitionSpecID + for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() { + if newSpec.CompatibleWith(&spec) { + newSpecId = spec.ID() + + break + } else if newSpecId <= spec.ID() { + newSpecId = spec.ID() + 1 + } + } + newSpec = iceberg.NewPartitionSpecID(newSpecId, partitionFields...) + + return &newSpec +} + +func (us *UpdateSpec) Commit() error { + updates, requirements, err := us.CommitUpdates() + if err != nil { + return err + } + + if len(updates) == 0 { + return nil + } + + return us.txn.apply(updates, requirements) +} + +func (us *UpdateSpec) CommitUpdates() ([]Update, []Requirement, error) { + newSpec := us.Apply() + updates := make([]Update, 0) + requirements := make([]Requirement, 0) + + if us.txn.tbl.Metadata().DefaultPartitionSpec() != newSpec.ID() { + if us.isNewPartitionSpec(newSpec.ID()) { + updates = append(updates, NewAddPartitionSpecUpdate(newSpec, false)) + updates = append(updates, NewSetDefaultSpecUpdate(-1)) + } else { + updates = append(updates, NewSetDefaultSpecUpdate(newSpec.ID())) + } + requiredLastAssignedPartitionId := us.txn.tbl.Metadata().LastPartitionSpecID() + requirements = append(requirements, AssertLastAssignedPartitionID(*requiredLastAssignedPartitionId)) + } + + return updates, requirements, nil +} + +func (us *UpdateSpec) partitionField(key transformKey, name string) (iceberg.PartitionField, error) { + if us.txn.tbl.Metadata().Version() == 2 { + sourceId, transform := key.SourceId, key.Transform + historicalFields := make([]iceberg.PartitionField, 0) + for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() { + for field := range spec.Fields() { + historicalFields = append(historicalFields, field) + } + } + for _, fields := range historicalFields { + if fields.SourceID == sourceId && fields.Transform.String() == transform { + return iceberg.PartitionField{ + SourceID: sourceId, + FieldID: fields.FieldID, + Name: name, + Transform: fields.Transform, + }, nil + } + } + } + newFieldId := us.newFieldId() + transform, _ := iceberg.ParseTransform(key.Transform) + if name == "" { + tmp_field := iceberg.PartitionField{ + SourceID: key.SourceId, + FieldID: newFieldId, + Name: "", + Transform: transform, + } + var err error + name, err = iceberg.GeneratePartitionFieldName(us.txn.tbl.Schema(), tmp_field) + if err != nil { + return iceberg.PartitionField{}, err + } + } + + return iceberg.PartitionField{ + SourceID: key.SourceId, + FieldID: newFieldId, + Name: name, + Transform: transform, + }, nil +} + +func (us *UpdateSpec) newFieldId() int { + us.lastAssignedFieldId += 1 + + return us.lastAssignedFieldId +} + +func (us *UpdateSpec) isDuplicatePartition(transform iceberg.Transform, partitionField iceberg.PartitionField) bool { + _, exists := us.deletes[partitionField.FieldID] + + return !exists && transform == partitionField.Transform Review Comment: should use `transform.Equals(partitionField.Transform)` ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) Review Comment: You don't need to explicitly call `.String()` when passing things to `fmt.Errorf` since you're using `%s` it will automatically use the `.String()` to create the string. So you can simplify this a bit and remove those explicit calls ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { + partitionFields := make([]iceberg.PartitionField, 0) + partitionNames := make(map[string]bool) + spec := us.txn.tbl.Metadata().PartitionSpec() + for field := range spec.Fields() { + var newField iceberg.PartitionField + var err error + if _, deleted := us.deletes[field.FieldID]; !deleted { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, field.Transform, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } else if us.txn.tbl.Metadata().Version() == 1 { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, iceberg.VoidTransform{}, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } + } + + for _, field := range us.adds { + newField := iceberg.PartitionField{ + SourceID: field.SourceID, + FieldID: field.FieldID, + Name: field.Name, + Transform: field.Transform, + } + partitionFields = append(partitionFields, newField) + } + + newSpec := iceberg.NewPartitionSpec(partitionFields...) + newSpecId := iceberg.InitialPartitionSpecID + for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() { + if newSpec.CompatibleWith(&spec) { + newSpecId = spec.ID() + + break + } else if newSpecId <= spec.ID() { + newSpecId = spec.ID() + 1 + } + } + newSpec = iceberg.NewPartitionSpecID(newSpecId, partitionFields...) + + return &newSpec +} + +func (us *UpdateSpec) Commit() error { + updates, requirements, err := us.CommitUpdates() + if err != nil { + return err + } + + if len(updates) == 0 { + return nil + } + + return us.txn.apply(updates, requirements) +} + +func (us *UpdateSpec) CommitUpdates() ([]Update, []Requirement, error) { Review Comment: it doesn't look like there's anything that can return an error here, can we remove the `error` from the signature? ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { + partitionFields := make([]iceberg.PartitionField, 0) + partitionNames := make(map[string]bool) + spec := us.txn.tbl.Metadata().PartitionSpec() + for field := range spec.Fields() { + var newField iceberg.PartitionField + var err error + if _, deleted := us.deletes[field.FieldID]; !deleted { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, field.Transform, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } else if us.txn.tbl.Metadata().Version() == 1 { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, iceberg.VoidTransform{}, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } + } + + for _, field := range us.adds { + newField := iceberg.PartitionField{ + SourceID: field.SourceID, + FieldID: field.FieldID, + Name: field.Name, + Transform: field.Transform, + } + partitionFields = append(partitionFields, newField) + } Review Comment: ```suggestion partitionFields = append(partitionFields, us.adds...) ``` ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { Review Comment: Is there a good reason to return a `*iceberg.PartitionSpec` vs just `iceberg.PartitionSpec`? ########## table/update_spec.go: ########## @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "errors" + "fmt" + + "github.com/apache/iceberg-go" +) + +type UpdateSpec struct { + txn *Transaction + nameToField map[string]iceberg.PartitionField + nameToAddedField map[string]iceberg.PartitionField + transformToField map[transformKey]iceberg.PartitionField + transformToAddedField map[transformKey]iceberg.PartitionField + renames map[string]string + addedTimeFields map[int]iceberg.PartitionField + caseSensitive bool + adds []iceberg.PartitionField + deletes map[int]bool + lastAssignedFieldId int +} + +func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec { + transformToField := make(map[transformKey]iceberg.PartitionField) + nameToField := make(map[string]iceberg.PartitionField) + partitionSpec := t.tbl.Metadata().PartitionSpec() + for partitionField := range partitionSpec.Fields() { + transformToField[transformKey{ + SourceId: partitionField.SourceID, + Transform: partitionField.Transform.String(), + }] = partitionField + nameToField[partitionField.Name] = partitionField + } + lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID() + if lastAssignedFieldId == nil { + v := iceberg.PartitionDataIDStart - 1 + lastAssignedFieldId = &v + } + + return &UpdateSpec{ + txn: t, + nameToField: nameToField, + nameToAddedField: make(map[string]iceberg.PartitionField), + transformToField: transformToField, + transformToAddedField: make(map[transformKey]iceberg.PartitionField), + renames: make(map[string]string), + addedTimeFields: make(map[int]iceberg.PartitionField), + caseSensitive: caseSensitive, + adds: make([]iceberg.PartitionField, 0), + deletes: make(map[int]bool), + lastAssignedFieldId: *lastAssignedFieldId, + } +} + +type transformKey struct { + SourceId int + Transform string +} + +func (us *UpdateSpec) AddField(sourceColName string, transform iceberg.Transform, partitionFieldName string) (*UpdateSpec, error) { + // Finds the column in the schema and binds it with case sensitivity. + ref := iceberg.Reference(sourceColName) + boundTerm, err := ref.Bind(us.txn.tbl.Schema(), us.caseSensitive) + if err != nil { + return nil, err + } + + // Validate the transform + outputType := boundTerm.Type() + if !transform.CanTransform(outputType) { + return nil, fmt.Errorf("%s cannot transform %s values from %s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name) + } + + // Check for duplicate transform on same column + key := transformKey{ + SourceId: boundTerm.Ref().Field().ID, + Transform: transform.String(), + } + existingPartitionField, exists := us.transformToField[key] + if exists && us.isDuplicatePartition(transform, existingPartitionField) { + return nil, fmt.Errorf("duplicate partition field for %s=%v, %v already exists", ref.String(), ref, existingPartitionField) + } + + // Check if this transform was already added + added, exists := us.transformToAddedField[key] + if exists { + return nil, fmt.Errorf("already added partition: %s ", added.Name) + } + + // Create the new partition field and Check for name collisions + // with existing fields + newField, err := us.partitionField(key, partitionFieldName) + if err != nil { + return nil, err + } + if _, exists = us.nameToAddedField[newField.Name]; exists { + return nil, fmt.Errorf("already added partition field with name: %s", newField.Name) + } + + // Handle special case for time transforms + if _, isTimeTransform := newField.Transform.(iceberg.TimeTransform); isTimeTransform { + if existingTimeField, exists := us.addedTimeFields[newField.SourceID]; exists { + return nil, fmt.Errorf("cannot add time partition field: %s conflicts with %s", newField.Name, existingTimeField.Name) + } + us.addedTimeFields[newField.SourceID] = newField + } + us.transformToAddedField[key] = newField + + // If name matches an existing field, rename it (if VOID) + existingPartitionField, exists = us.nameToField[newField.Name] + if _, inDelete := us.deletes[existingPartitionField.FieldID]; exists && !inDelete { + if _, isVoidTransform := existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform { + _, err = us.RenameField(existingPartitionField.Name, fmt.Sprintf("%s_%d", existingPartitionField.Name, existingPartitionField.FieldID)) + if err != nil { + return nil, err + } + } else { + return nil, fmt.Errorf("cannot add duplicate partition field name: %s", existingPartitionField.Name) + } + } + + // Register the new field + us.nameToAddedField[newField.Name] = newField + us.adds = append(us.adds, newField) + + return us, nil +} + +func (us *UpdateSpec) AddIdentity(sourceColName string) (*UpdateSpec, error) { + return us.AddField(sourceColName, iceberg.IdentityTransform{}, "") +} + +func (us *UpdateSpec) RemoveField(name string) (*UpdateSpec, error) { + if _, added := us.nameToAddedField[name]; added { + return nil, fmt.Errorf("cannot remove newly added field %s", name) + } + if _, renamed := us.renames[name]; renamed { + return nil, fmt.Errorf("cannot rename and delete field %s", name) + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field %s", name) + } + us.deletes[field.FieldID] = true + + return us, nil +} + +func (us *UpdateSpec) RenameField(name string, newName string) (*UpdateSpec, error) { + existingField, exists := us.nameToField[newName] + _, isVoidTransform := existingField.Transform.(iceberg.VoidTransform) + if exists && isVoidTransform { + return us.RenameField(name, fmt.Sprintf("%s_%d", name, existingField.FieldID)) + } + if _, added := us.nameToAddedField[name]; added { + return nil, errors.New("cannot rename recently added partitions") + } + field, exists := us.nameToField[name] + if !exists { + return nil, fmt.Errorf("cannot find partition field: %s", name) + } + if _, deleted := us.deletes[field.FieldID]; deleted { + return nil, fmt.Errorf("cannot delete and rename partition field: %s", name) + } + us.renames[name] = newName + + return us, nil +} + +func (us *UpdateSpec) Apply() *iceberg.PartitionSpec { + partitionFields := make([]iceberg.PartitionField, 0) + partitionNames := make(map[string]bool) + spec := us.txn.tbl.Metadata().PartitionSpec() + for field := range spec.Fields() { + var newField iceberg.PartitionField + var err error + if _, deleted := us.deletes[field.FieldID]; !deleted { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, field.Transform, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } else if us.txn.tbl.Metadata().Version() == 1 { + if rename, renamed := us.renames[field.Name]; renamed { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, iceberg.VoidTransform{}, partitionNames) + } else { + newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) + } + if err != nil { + return nil + } + partitionFields = append(partitionFields, newField) + } + } + + for _, field := range us.adds { + newField := iceberg.PartitionField{ + SourceID: field.SourceID, + FieldID: field.FieldID, + Name: field.Name, + Transform: field.Transform, + } + partitionFields = append(partitionFields, newField) + } + + newSpec := iceberg.NewPartitionSpec(partitionFields...) + newSpecId := iceberg.InitialPartitionSpecID + for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() { + if newSpec.CompatibleWith(&spec) { + newSpecId = spec.ID() + + break + } else if newSpecId <= spec.ID() { + newSpecId = spec.ID() + 1 + } + } + newSpec = iceberg.NewPartitionSpecID(newSpecId, partitionFields...) + + return &newSpec +} + +func (us *UpdateSpec) Commit() error { + updates, requirements, err := us.CommitUpdates() + if err != nil { + return err + } + + if len(updates) == 0 { + return nil + } + + return us.txn.apply(updates, requirements) +} + +func (us *UpdateSpec) CommitUpdates() ([]Update, []Requirement, error) { + newSpec := us.Apply() + updates := make([]Update, 0) + requirements := make([]Requirement, 0) + + if us.txn.tbl.Metadata().DefaultPartitionSpec() != newSpec.ID() { + if us.isNewPartitionSpec(newSpec.ID()) { + updates = append(updates, NewAddPartitionSpecUpdate(newSpec, false)) + updates = append(updates, NewSetDefaultSpecUpdate(-1)) + } else { + updates = append(updates, NewSetDefaultSpecUpdate(newSpec.ID())) + } + requiredLastAssignedPartitionId := us.txn.tbl.Metadata().LastPartitionSpecID() + requirements = append(requirements, AssertLastAssignedPartitionID(*requiredLastAssignedPartitionId)) + } + + return updates, requirements, nil +} + +func (us *UpdateSpec) partitionField(key transformKey, name string) (iceberg.PartitionField, error) { + if us.txn.tbl.Metadata().Version() == 2 { + sourceId, transform := key.SourceId, key.Transform + historicalFields := make([]iceberg.PartitionField, 0) + for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() { + for field := range spec.Fields() { + historicalFields = append(historicalFields, field) + } + } + for _, fields := range historicalFields { + if fields.SourceID == sourceId && fields.Transform.String() == transform { + return iceberg.PartitionField{ Review Comment: i think this also needs to check `if len(name) > 0 || field.Name == name` before choosing to return the field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org