zeroshade commented on code in PR #146:
URL: https://github.com/apache/iceberg-go/pull/146#discussion_r1760197376


##########
catalog/rest.go:
##########
@@ -546,12 +640,8 @@ func (r *RestCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
        path := []string{"namespaces", ns, "tables"}
 
        type resp struct {
-               Identifiers []struct {
-                       Namespace []string `json:"namespace"`
-                       Name      string   `json:"name"`
-               } `json:"identifiers"`
+               Identifiers []Identifier `json:"identifiers"`

Review Comment:
   is the identifier type used anywhere other than here? The reason I had done 
it inline here was because it was only used in this one spot and i didn't want 
it to get confused with `table.Identifier`



##########
catalog/rest.go:
##########
@@ -84,6 +84,86 @@ func (e errorResponse) Error() string {
        return e.Type + ": " + e.Message
 }
 
+type Identifier struct {
+       Namespace []string `json:"namespace"`
+       Name      string   `json:"name"`
+}

Review Comment:
   if we're going to export this type, we should probably name it 
`RestIdentifier` or something equivalent to separate it from other catalog 
identifier types.



##########
catalog/rest.go:
##########
@@ -573,64 +663,151 @@ func splitIdentForPath(ident table.Identifier) (string, 
string, error) {
        return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), 
TableNameFromIdent(ident), nil
 }
 
-type tblResponse struct {
-       MetadataLoc string             `json:"metadata-location"`
-       RawMetadata json.RawMessage    `json:"metadata"`
-       Config      iceberg.Properties `json:"config"`
-       Metadata    table.Metadata     `json:"-"`
-}
+func (r *RestCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOption) 
(*table.Table, error) {
+       ns, tbl, err := splitIdentForPath(identifier)
+       if err != nil {
+               return nil, err
+       }
 
-func (t *tblResponse) UnmarshalJSON(b []byte) (err error) {
-       type Alias tblResponse
-       if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
-               return err
+       payload := createTableRequest{
+               Name:   tbl,
+               Schema: schema,
+       }
+       for _, o := range opts {
+               o(&payload)
        }
 
-       t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
-       return
+       ret, err := doPost[createTableRequest, loadTableResponse](ctx, 
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
+               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, 
http.StatusConflict: ErrTableAlreadyExists})
+       if err != nil {
+               return nil, err
+       }
+
+       config := maps.Clone(r.props)
+       maps.Copy(config, ret.Metadata.Properties())
+       for k, v := range ret.Config {
+               config[k] = v
+       }
+
+       return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
 }
 
-func (r *RestCatalog) LoadTable(ctx context.Context, identifier 
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+func (r *RestCatalog) RegisterTable(ctx context.Context, identifier 
table.Identifier, metadataLoc string) (*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return nil, err
        }
 
-       if props == nil {
-               props = iceberg.Properties{}
+       type payload struct {
+               Name        string `json:"name"`
+               MetadataLoc string `json:"metadata-location"`
        }
 
-       ret, err := doGet[tblResponse](ctx, r.baseURI, []string{"namespaces", 
ns, "tables", tbl},
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
+       ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
+               payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: 
ErrTableAlreadyExists})
        if err != nil {
                return nil, err
        }
 
-       id := identifier
-       if r.name != "" {
-               id = append([]string{r.name}, identifier...)
+       config := maps.Clone(r.props)
+       maps.Copy(config, ret.Metadata.Properties())
+       for k, v := range ret.Config {
+               config[k] = v
        }

Review Comment:
   same question, why not `maps.Copy(config, ret.Config)`?



##########
schema.go:
##########
@@ -297,14 +297,14 @@ func (s *Schema) accessorForField(id int) (accessor, 
bool) {
 // Equals compares the fields and identifierIDs, but does not compare
 // the schema ID itself.
 func (s *Schema) Equals(other *Schema) bool {
-       if other == nil {
-               return false
-       }
-
        if s == other {
                return true
        }
 
+       if s == nil || other == nil {
+               return false
+       }

Review Comment:
   how did we end up with calling this on a nil schema? :(



##########
table/metadata.go:
##########
@@ -80,20 +86,532 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a map of snapshot refs by name.
+       Refs() map[string]SnapshotRef
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() []SnapshotLogEntry
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() []MetadataLogEntry
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+       b.refs = metadata.Refs()
+       b.snapshotLog = metadata.SnapshotLogs()
+       b.metadataLog = metadata.PreviousFiles()
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+       }
+
+       var schemas []*iceberg.Schema
+       if initial {
+               schemas = []*iceberg.Schema{schema}
+       } else {
+               schemas = append(b.schemaList, schema)
+       }
+
+       b.lastColumnId = newLastColumnID
+       b.schemaList = schemas
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       var maxFieldID int
+       if len(spec.Fields()) > 0 {
+               maxField := slices.MaxFunc(spec.Fields(), func(a, b 
iceberg.PartitionField) int {
+                       return a.FieldID - b.FieldID
+               })
+               maxFieldID = maxField.FieldID
+       }
+
+       prev := PARTITION_FIELD_ID_START - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+       lastPartitionID := max(maxFieldID, prev)
+
+       var specs []iceberg.PartitionSpec
+       if initial {
+               specs = []iceberg.PartitionSpec{*spec}
+       } else {
+               specs = append(b.specs, *spec)
+       }
+
+       b.specs = specs
+       b.lastPartitionID = &lastPartitionID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if snapshot == nil {
+               return nil, nil
+       }
+
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if len(b.sortOrderList) == 0 {
+               return nil, errors.New("can't add snapshot with no added sort 
orders")
+       } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+               return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
+       } else if b.formatVersion == 2 &&
+               snapshot.SequenceNumber > 0 &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+               snapshot.ParentSnapshotID != nil {
+               return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
+                       snapshot.SequenceNumber, b.lastSequenceNumber)
+       }
+
+       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       b.lastUpdatedMS = snapshot.TimestampMs
+       b.lastSequenceNumber = &snapshot.SequenceNumber
+       b.snapshotList = append(b.snapshotList, *snapshot)
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) 
(*MetadataBuilder, error) {
+       var sortOrders []SortOrder
+       if initial {
+               sortOrders = []SortOrder{*sortOrder}
+       } else {
+               sortOrders = append(b.sortOrderList, *sortOrder)
+       }
+
+       b.sortOrderList = sortOrders
+       b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, 
error) {
+       if len(keys) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+       for _, key := range keys {
+               delete(b.props, key)
+       }
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) 
(*MetadataBuilder, error) {
+       if currentSchemaID == -1 {
+               currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) 
int {
+                       return s.ID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-schema" && 
u.(*AddSchemaUpdate).Schema.ID == currentSchemaID
+               }) {
+                       return nil, errors.New("can't set current schema to 
last added schema, no schema has been added")
+               }
+       }
+
+       if currentSchemaID == b.currentSchemaID {
+               return b, nil
+       }
+
+       _, err := b.GetSchemaByID(currentSchemaID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set current schema to schema with 
id %d: %w", currentSchemaID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetCurrentSchemaUpdate(currentSchemaID))
+       b.currentSchemaID = currentSchemaID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) 
(*MetadataBuilder, error) {
+       if defaultSortOrderID == -1 {
+               defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) 
int {
+                       return s.OrderID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-sort-order" && 
u.(*AddSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID
+               }) {
+                       return nil, fmt.Errorf("can't set default sort order to 
last added with no added sort orders")
+               }
+       }
+
+       if defaultSortOrderID == b.defaultSortOrderID {
+               return b, nil
+       }
+
+       if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
+               return nil, fmt.Errorf("can't set default sort order to sort 
order with id %d: %w", defaultSortOrderID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+       b.defaultSortOrderID = defaultSortOrderID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) 
(*MetadataBuilder, error) {
+       if defaultSpecID == -1 {
+               defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) 
int {
+                       return s.ID()
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-partition-spec" && 
u.(*AddPartitionSpecUpdate).Spec.ID() == defaultSpecID
+               }) {
+                       return nil, fmt.Errorf("can't set default spec to last 
added with no added partition specs")
+               }
+       }
+
+       if defaultSpecID == b.defaultSpecID {
+               return b, nil
+       }
+
+       if _, err := b.GetSpecByID(defaultSpecID); err != nil {
+               return nil, fmt.Errorf("can't set default spec to spec with id 
%d: %w", defaultSpecID, err)
+       }
+
+       b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+       b.defaultSpecID = defaultSpecID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetFormatVersion(formatVersion int) 
(*MetadataBuilder, error) {
+       if formatVersion < b.formatVersion {
+               return nil, fmt.Errorf("downgrading format version from %d to 
%d is not allowed",
+                       b.formatVersion, formatVersion)
+       }
+
+       if formatVersion > SUPPORTED_TABLE_FORMAT_VERSION {
+               return nil, fmt.Errorf("unsupported format version %d", 
formatVersion)
+       }
+
+       if formatVersion == b.formatVersion {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, 
NewUpgradeFormatVersionUpdate(formatVersion))
+       b.formatVersion = formatVersion

Review Comment:
   don't we need to perform the migration in this case and actually make the 
changes necessary for the format version upgrade?



##########
table/metadata.go:
##########
@@ -80,20 +86,532 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a map of snapshot refs by name.
+       Refs() map[string]SnapshotRef
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() []SnapshotLogEntry
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() []MetadataLogEntry
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+       b.refs = metadata.Refs()
+       b.snapshotLog = metadata.SnapshotLogs()
+       b.metadataLog = metadata.PreviousFiles()
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+       }
+
+       var schemas []*iceberg.Schema
+       if initial {
+               schemas = []*iceberg.Schema{schema}
+       } else {
+               schemas = append(b.schemaList, schema)
+       }
+
+       b.lastColumnId = newLastColumnID
+       b.schemaList = schemas
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       var maxFieldID int
+       if len(spec.Fields()) > 0 {
+               maxField := slices.MaxFunc(spec.Fields(), func(a, b 
iceberg.PartitionField) int {
+                       return a.FieldID - b.FieldID
+               })
+               maxFieldID = maxField.FieldID
+       }
+
+       prev := PARTITION_FIELD_ID_START - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+       lastPartitionID := max(maxFieldID, prev)
+
+       var specs []iceberg.PartitionSpec
+       if initial {
+               specs = []iceberg.PartitionSpec{*spec}
+       } else {
+               specs = append(b.specs, *spec)
+       }
+
+       b.specs = specs
+       b.lastPartitionID = &lastPartitionID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if snapshot == nil {
+               return nil, nil
+       }
+
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if len(b.sortOrderList) == 0 {
+               return nil, errors.New("can't add snapshot with no added sort 
orders")
+       } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+               return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
+       } else if b.formatVersion == 2 &&
+               snapshot.SequenceNumber > 0 &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+               snapshot.ParentSnapshotID != nil {
+               return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
+                       snapshot.SequenceNumber, b.lastSequenceNumber)
+       }
+
+       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       b.lastUpdatedMS = snapshot.TimestampMs
+       b.lastSequenceNumber = &snapshot.SequenceNumber
+       b.snapshotList = append(b.snapshotList, *snapshot)
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) 
(*MetadataBuilder, error) {
+       var sortOrders []SortOrder
+       if initial {
+               sortOrders = []SortOrder{*sortOrder}
+       } else {
+               sortOrders = append(b.sortOrderList, *sortOrder)
+       }
+
+       b.sortOrderList = sortOrders
+       b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, 
error) {
+       if len(keys) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+       for _, key := range keys {
+               delete(b.props, key)
+       }
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) 
(*MetadataBuilder, error) {
+       if currentSchemaID == -1 {
+               currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) 
int {
+                       return s.ID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-schema" && 
u.(*AddSchemaUpdate).Schema.ID == currentSchemaID
+               }) {
+                       return nil, errors.New("can't set current schema to 
last added schema, no schema has been added")
+               }
+       }
+
+       if currentSchemaID == b.currentSchemaID {
+               return b, nil
+       }
+
+       _, err := b.GetSchemaByID(currentSchemaID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set current schema to schema with 
id %d: %w", currentSchemaID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetCurrentSchemaUpdate(currentSchemaID))
+       b.currentSchemaID = currentSchemaID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) 
(*MetadataBuilder, error) {
+       if defaultSortOrderID == -1 {
+               defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) 
int {
+                       return s.OrderID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-sort-order" && 
u.(*AddSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID
+               }) {
+                       return nil, fmt.Errorf("can't set default sort order to 
last added with no added sort orders")
+               }
+       }
+
+       if defaultSortOrderID == b.defaultSortOrderID {
+               return b, nil
+       }
+
+       if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
+               return nil, fmt.Errorf("can't set default sort order to sort 
order with id %d: %w", defaultSortOrderID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+       b.defaultSortOrderID = defaultSortOrderID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) 
(*MetadataBuilder, error) {
+       if defaultSpecID == -1 {
+               defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) 
int {
+                       return s.ID()
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-partition-spec" && 
u.(*AddPartitionSpecUpdate).Spec.ID() == defaultSpecID
+               }) {
+                       return nil, fmt.Errorf("can't set default spec to last 
added with no added partition specs")
+               }
+       }
+
+       if defaultSpecID == b.defaultSpecID {
+               return b, nil
+       }
+
+       if _, err := b.GetSpecByID(defaultSpecID); err != nil {
+               return nil, fmt.Errorf("can't set default spec to spec with id 
%d: %w", defaultSpecID, err)
+       }
+
+       b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+       b.defaultSpecID = defaultSpecID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetFormatVersion(formatVersion int) 
(*MetadataBuilder, error) {
+       if formatVersion < b.formatVersion {
+               return nil, fmt.Errorf("downgrading format version from %d to 
%d is not allowed",
+                       b.formatVersion, formatVersion)
+       }
+
+       if formatVersion > SUPPORTED_TABLE_FORMAT_VERSION {
+               return nil, fmt.Errorf("unsupported format version %d", 
formatVersion)
+       }
+
+       if formatVersion == b.formatVersion {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, 
NewUpgradeFormatVersionUpdate(formatVersion))
+       b.formatVersion = formatVersion
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) {
+       if b.loc == loc {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewSetLocationUpdate(loc))
+       b.loc = loc
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetProperties(props iceberg.Properties) 
(*MetadataBuilder, error) {
+       if len(props) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewSetPropertiesUpdate(props))
+       maps.Copy(b.props, props)
+       return b, nil
+}
+
+type setSnapshotRefOption func(*SnapshotRef) error
+
+func WithMaxRefAgeMs(maxRefAgeMs int64) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if maxRefAgeMs <= 0 {
+                       return fmt.Errorf("%w: maxRefAgeMs %d, must be > 0", 
iceberg.ErrInvalidArgument, maxRefAgeMs)
+               }
+               ref.MaxRefAgeMs = &maxRefAgeMs
+               return nil
+       }
+}
+
+func WithMaxSnapshotAgeMs(maxSnapshotAgeMs int64) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if maxSnapshotAgeMs <= 0 {
+                       return fmt.Errorf("%w: maxSnapshotAgeMs %d, must be > 
0", iceberg.ErrInvalidArgument, maxSnapshotAgeMs)
+               }
+               ref.MaxSnapshotAgeMs = &maxSnapshotAgeMs
+               return nil
+       }
+}
+
+func WithMinSnapshotsToKeep(minSnapshotsToKeep int) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if minSnapshotsToKeep <= 0 {
+                       return fmt.Errorf("%w: minSnapshotsToKeep %d, must be > 
0", iceberg.ErrInvalidArgument, minSnapshotsToKeep)
+               }
+               ref.MinSnapshotsToKeep = &minSnapshotsToKeep
+               return nil
+       }
+}
+
+func (b *MetadataBuilder) SetSnapshotRef(
+       name string,
+       snapshotID int64,
+       refType RefType,
+       options ...setSnapshotRefOption,
+) (*MetadataBuilder, error) {
+       ref := SnapshotRef{
+               SnapshotID:      snapshotID,
+               SnapshotRefType: refType,
+       }
+       for _, opt := range options {
+               if err := opt(&ref); err != nil {
+                       return nil, fmt.Errorf("invalid snapshot ref option: 
%w", err)
+               }
+       }
+
+       var maxRefAgeMs, maxSnapshotAgeMs int64
+       var minSnapshotsToKeep int
+       if ref.MaxRefAgeMs != nil {
+               maxRefAgeMs = *ref.MaxRefAgeMs
+       }
+       if ref.MaxSnapshotAgeMs != nil {
+               maxSnapshotAgeMs = *ref.MaxSnapshotAgeMs
+       }
+       if ref.MinSnapshotsToKeep != nil {
+               minSnapshotsToKeep = *ref.MinSnapshotsToKeep
+       }
+
+       if existingRef, ok := b.refs[name]; ok && existingRef.Equals(ref) {
+               return b, nil
+       }
+
+       snapshot, err := b.SnapshotByID(snapshotID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set snapshot ref %s to unknown 
snapshot %d: %w", name, snapshotID, err)
+       }
+
+       if refType == MainBranch {
+               b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, 
snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep))
+               b.currentSnapshotID = &snapshotID
+               b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{
+                       SnapshotID:  snapshotID,
+                       TimestampMs: snapshot.TimestampMs,
+               })
+               b.lastUpdatedMS = time.Now().Local().UnixMilli()
+       }
+
+       if containsBy(b.updates, func(u Update) bool {
+               return u.Action() == "add-snapshot" && 
u.(*AddSnapshotUpdate).Snapshot.SnapshotID == snapshotID
+       }) {
+               b.lastUpdatedMS = snapshot.TimestampMs
+       }
+
+       b.refs[name] = ref
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) {
+       if b.uuid == uuid {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewAssignUUIDUpdate(uuid))
+       b.uuid = uuid
+       return b, nil
+}
+
+func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
+       return &commonMetadata{
+               FormatVersion:      b.formatVersion,
+               UUID:               b.uuid,
+               Loc:                b.loc,
+               LastUpdatedMS:      b.lastUpdatedMS,
+               LastColumnId:       b.lastColumnId,
+               SchemaList:         b.schemaList,
+               CurrentSchemaID:    b.currentSchemaID,
+               Specs:              b.specs,
+               DefaultSpecID:      b.defaultSpecID,
+               LastPartitionID:    b.lastPartitionID,
+               Props:              b.props,
+               SnapshotList:       b.snapshotList,
+               CurrentSnapshotID:  b.currentSnapshotID,
+               SnapshotLog:        b.snapshotLog,
+               MetadataLog:        b.metadataLog,
+               SortOrderList:      b.sortOrderList,
+               DefaultSortOrderID: b.defaultSortOrderID,
+               SnapshotRefs:       b.refs,
+       }
+}
+
+func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
+       for _, s := range b.schemaList {
+               if s.ID == id {
+                       return s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("%w: schema with id %d not found", 
iceberg.ErrInvalidArgument, id)
+}
+
+func (b *MetadataBuilder) GetSpecByID(id int) (*iceberg.PartitionSpec, error) {
+       for _, s := range b.specs {
+               if s.ID() == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("partition spec with id %d not found", id)
+}
+
+func (b *MetadataBuilder) GetSortOrderByID(id int) (*SortOrder, error) {
+       for _, s := range b.sortOrderList {
+               if s.OrderID == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("sort order with id %d not found", id)
+}
+
+func (b *MetadataBuilder) SnapshotByID(id int64) (*Snapshot, error) {
+       for _, s := range b.snapshotList {
+               if s.SnapshotID == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("snapshot with id %d not found", id)
+}
+
+func (b *MetadataBuilder) Build() (Metadata, error) {
+       common := b.buildCommonMetadata()
+       switch b.formatVersion {
+       case 1:
+               schema, err := b.GetSchemaByID(b.currentSchemaID)
+               if err != nil {
+                       return nil, fmt.Errorf("can't build metadata, missing 
schema for schema ID %d: %w", b.currentSchemaID, err)
+               }
+               partition, err := b.GetSpecByID(b.defaultSpecID)
+               if err != nil {
+                       return nil, fmt.Errorf("can't build metadata, missing 
partition spec for spec ID %d: %w", b.defaultSpecID, err)
+               }
+               return &metadataV1{
+                       Schema:         schema,
+                       Partition:      partition.Fields(),
+                       commonMetadata: *common,
+               }, nil
+       case 2:
+               return &metadataV2{
+                       LastSequenceNumber: *b.lastSequenceNumber,
+                       commonMetadata:     *common,
+               }, nil
+       default:
+               panic("unreachable: invalid format version")
+       }
+}
+
+// containsBy returns true if found(e) is true for any e in elems.
+func containsBy[S []E, E any](elems S, found func(e E) bool) bool {
+       for _, e := range elems {
+               if found(e) {
+                       return true
+               }
+       }
+       return false
+}

Review Comment:
   replace with `slices.ContainsFunc`



##########
table/metadata.go:
##########
@@ -24,12 +24,18 @@ import (
        "io"
        "maps"
        "slices"
+       "time"
 
        "github.com/apache/iceberg-go"
 
        "github.com/google/uuid"
 )
 
+const (
+       PARTITION_FIELD_ID_START       = 1000
+       SUPPORTED_TABLE_FORMAT_VERSION = 2

Review Comment:
   doc strings for these if we're going to export them?



##########
table/metadata.go:
##########
@@ -80,20 +86,532 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a map of snapshot refs by name.
+       Refs() map[string]SnapshotRef
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() []SnapshotLogEntry
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() []MetadataLogEntry
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+       b.refs = metadata.Refs()
+       b.snapshotLog = metadata.SnapshotLogs()
+       b.metadataLog = metadata.PreviousFiles()
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+       }
+
+       var schemas []*iceberg.Schema
+       if initial {
+               schemas = []*iceberg.Schema{schema}
+       } else {
+               schemas = append(b.schemaList, schema)
+       }
+
+       b.lastColumnId = newLastColumnID
+       b.schemaList = schemas
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       var maxFieldID int
+       if len(spec.Fields()) > 0 {
+               maxField := slices.MaxFunc(spec.Fields(), func(a, b 
iceberg.PartitionField) int {
+                       return a.FieldID - b.FieldID
+               })
+               maxFieldID = maxField.FieldID
+       }
+
+       prev := PARTITION_FIELD_ID_START - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+       lastPartitionID := max(maxFieldID, prev)
+
+       var specs []iceberg.PartitionSpec
+       if initial {
+               specs = []iceberg.PartitionSpec{*spec}
+       } else {
+               specs = append(b.specs, *spec)
+       }
+
+       b.specs = specs
+       b.lastPartitionID = &lastPartitionID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if snapshot == nil {
+               return nil, nil
+       }
+
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if len(b.sortOrderList) == 0 {
+               return nil, errors.New("can't add snapshot with no added sort 
orders")
+       } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+               return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
+       } else if b.formatVersion == 2 &&
+               snapshot.SequenceNumber > 0 &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+               snapshot.ParentSnapshotID != nil {
+               return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
+                       snapshot.SequenceNumber, b.lastSequenceNumber)
+       }
+
+       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       b.lastUpdatedMS = snapshot.TimestampMs
+       b.lastSequenceNumber = &snapshot.SequenceNumber
+       b.snapshotList = append(b.snapshotList, *snapshot)
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) 
(*MetadataBuilder, error) {
+       var sortOrders []SortOrder
+       if initial {
+               sortOrders = []SortOrder{*sortOrder}
+       } else {
+               sortOrders = append(b.sortOrderList, *sortOrder)
+       }
+

Review Comment:
   should we validate if there's an existing sort order with that ID? or is it 
fine to just override it?



##########
catalog/rest.go:
##########
@@ -573,64 +663,151 @@ func splitIdentForPath(ident table.Identifier) (string, 
string, error) {
        return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), 
TableNameFromIdent(ident), nil
 }
 
-type tblResponse struct {
-       MetadataLoc string             `json:"metadata-location"`
-       RawMetadata json.RawMessage    `json:"metadata"`
-       Config      iceberg.Properties `json:"config"`
-       Metadata    table.Metadata     `json:"-"`
-}
+func (r *RestCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOption) 
(*table.Table, error) {
+       ns, tbl, err := splitIdentForPath(identifier)
+       if err != nil {
+               return nil, err
+       }
 
-func (t *tblResponse) UnmarshalJSON(b []byte) (err error) {
-       type Alias tblResponse
-       if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
-               return err
+       payload := createTableRequest{
+               Name:   tbl,
+               Schema: schema,
+       }
+       for _, o := range opts {
+               o(&payload)
        }
 
-       t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
-       return
+       ret, err := doPost[createTableRequest, loadTableResponse](ctx, 
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
+               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, 
http.StatusConflict: ErrTableAlreadyExists})
+       if err != nil {
+               return nil, err
+       }
+
+       config := maps.Clone(r.props)
+       maps.Copy(config, ret.Metadata.Properties())
+       for k, v := range ret.Config {
+               config[k] = v
+       }

Review Comment:
   why loop instead of just doing `maps.Copy` (which does the loop internally)



##########
partitions.go:
##########
@@ -117,6 +117,11 @@ func (ps PartitionSpec) Equals(other PartitionSpec) bool {
        return ps.id == other.id && slices.Equal(ps.fields, other.fields)
 }
 
+// Fields returns a clone of the partition fields in this spec.
+func (ps *PartitionSpec) Fields() []PartitionField {
+       return slices.Clone(ps.fields)
+}
+

Review Comment:
   if we're okay with bumping to `go1.23` we could use `iter` here and do 
`slices.Values(ps.Fields)` this way we don't have clone the entire slice but 
also maintain that we disallow users from modifying the slice.
   
   Thus this function would be:
   
   ```go
   func (ps *PartitionSpec) Fields() iter.Seq[PartitionField] {
       return slices.Values(ps.fields)
   }
   ```
   
   and a user would be able to iterate over the fields: 
   
   ```go
   for f := range spec.Fields() {
       // do something
   }
   ```
   
   Alternately, you could use `slices.All` if you want to preserve the `index, 
value` nature of the range 



##########
table/metadata.go:
##########
@@ -163,10 +681,22 @@ type commonMetadata struct {
        MetadataLog        []MetadataLogEntry      `json:"metadata-log"`
        SortOrderList      []SortOrder             `json:"sort-orders"`
        DefaultSortOrderID int                     
`json:"default-sort-order-id"`
-       Refs               map[string]SnapshotRef  `json:"refs"`
+       SnapshotRefs       map[string]SnapshotRef  `json:"refs"`
 }
 
+func (c *commonMetadata) Ref() SnapshotRef                  { return 
c.SnapshotRefs[MainBranch] }
+func (c *commonMetadata) Refs() map[string]SnapshotRef      { return 
maps.Clone(c.SnapshotRefs) }

Review Comment:
   lets use `maps.All` like i mentioned before for `slices.All`/`slices.Values` 
so that we can return an iterator without having to clone the whole map.



##########
table/metadata.go:
##########
@@ -163,10 +681,22 @@ type commonMetadata struct {
        MetadataLog        []MetadataLogEntry      `json:"metadata-log"`
        SortOrderList      []SortOrder             `json:"sort-orders"`
        DefaultSortOrderID int                     
`json:"default-sort-order-id"`
-       Refs               map[string]SnapshotRef  `json:"refs"`
+       SnapshotRefs       map[string]SnapshotRef  `json:"refs"`
 }
 
+func (c *commonMetadata) Ref() SnapshotRef                  { return 
c.SnapshotRefs[MainBranch] }
+func (c *commonMetadata) Refs() map[string]SnapshotRef      { return 
maps.Clone(c.SnapshotRefs) }
+func (c *commonMetadata) SnapshotLogs() []SnapshotLogEntry  { return 
slices.Clone(c.SnapshotLog) }
+func (c *commonMetadata) PreviousFiles() []MetadataLogEntry { return 
slices.Clone(c.MetadataLog) }

Review Comment:
   same comment as before about using iterators and `slices.Values` or 
`slices.All`



##########
table/metadata.go:
##########
@@ -80,20 +86,532 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a map of snapshot refs by name.
+       Refs() map[string]SnapshotRef
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() []SnapshotLogEntry
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() []MetadataLogEntry
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+       b.refs = metadata.Refs()
+       b.snapshotLog = metadata.SnapshotLogs()
+       b.metadataLog = metadata.PreviousFiles()
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+       }
+
+       var schemas []*iceberg.Schema
+       if initial {
+               schemas = []*iceberg.Schema{schema}
+       } else {
+               schemas = append(b.schemaList, schema)
+       }
+
+       b.lastColumnId = newLastColumnID
+       b.schemaList = schemas
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       var maxFieldID int
+       if len(spec.Fields()) > 0 {
+               maxField := slices.MaxFunc(spec.Fields(), func(a, b 
iceberg.PartitionField) int {
+                       return a.FieldID - b.FieldID
+               })
+               maxFieldID = maxField.FieldID
+       }
+
+       prev := PARTITION_FIELD_ID_START - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+       lastPartitionID := max(maxFieldID, prev)
+
+       var specs []iceberg.PartitionSpec
+       if initial {
+               specs = []iceberg.PartitionSpec{*spec}
+       } else {
+               specs = append(b.specs, *spec)
+       }
+
+       b.specs = specs
+       b.lastPartitionID = &lastPartitionID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if snapshot == nil {
+               return nil, nil
+       }
+
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if len(b.sortOrderList) == 0 {
+               return nil, errors.New("can't add snapshot with no added sort 
orders")
+       } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+               return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
+       } else if b.formatVersion == 2 &&
+               snapshot.SequenceNumber > 0 &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+               snapshot.ParentSnapshotID != nil {
+               return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
+                       snapshot.SequenceNumber, b.lastSequenceNumber)
+       }
+
+       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       b.lastUpdatedMS = snapshot.TimestampMs
+       b.lastSequenceNumber = &snapshot.SequenceNumber
+       b.snapshotList = append(b.snapshotList, *snapshot)
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) 
(*MetadataBuilder, error) {
+       var sortOrders []SortOrder
+       if initial {
+               sortOrders = []SortOrder{*sortOrder}
+       } else {
+               sortOrders = append(b.sortOrderList, *sortOrder)
+       }
+
+       b.sortOrderList = sortOrders
+       b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, 
error) {
+       if len(keys) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+       for _, key := range keys {
+               delete(b.props, key)
+       }
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) 
(*MetadataBuilder, error) {
+       if currentSchemaID == -1 {
+               currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) 
int {
+                       return s.ID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-schema" && 
u.(*AddSchemaUpdate).Schema.ID == currentSchemaID

Review Comment:
   should we make these Action strings constants?



##########
table/metadata.go:
##########
@@ -163,10 +681,22 @@ type commonMetadata struct {
        MetadataLog        []MetadataLogEntry      `json:"metadata-log"`
        SortOrderList      []SortOrder             `json:"sort-orders"`
        DefaultSortOrderID int                     
`json:"default-sort-order-id"`
-       Refs               map[string]SnapshotRef  `json:"refs"`
+       SnapshotRefs       map[string]SnapshotRef  `json:"refs"`
 }
 
+func (c *commonMetadata) Ref() SnapshotRef                  { return 
c.SnapshotRefs[MainBranch] }
+func (c *commonMetadata) Refs() map[string]SnapshotRef      { return 
maps.Clone(c.SnapshotRefs) }
+func (c *commonMetadata) SnapshotLogs() []SnapshotLogEntry  { return 
slices.Clone(c.SnapshotLog) }
+func (c *commonMetadata) PreviousFiles() []MetadataLogEntry { return 
slices.Clone(c.MetadataLog) }
 func (c *commonMetadata) Equals(other *commonMetadata) bool {
+       if c == nil || other == nil {
+               return c == other
+       }

Review Comment:
   in what scenario is c nil?



##########
table/metadata.go:
##########
@@ -80,20 +86,512 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a map of snapshot refs by name.
+       Refs() map[string]SnapshotRef
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() []SnapshotLogEntry
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() []MetadataLogEntry
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+       b.refs = metadata.Refs()
+       b.snapshotLog = metadata.SnapshotLogs()
+       b.metadataLog = metadata.PreviousFiles()
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("invalid last column id %d, must be >= 
%d",
+                       newLastColumnID, b.lastColumnId)
+       }
+
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID))
+       b.lastColumnId = newLastColumnID
+       b.schemaList = append(b.schemaList, schema)
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       var maxFieldID int
+       if len(spec.Fields()) > 0 {
+               maxField := slices.MaxFunc(spec.Fields(), func(a, b 
iceberg.PartitionField) int {
+                       return a.FieldID - b.FieldID
+               })
+               maxFieldID = maxField.FieldID
+       }
+
+       prev := PARTITION_FIELD_ID_START - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+
+       lastPartitionID := max(maxFieldID, prev)
+       b.lastPartitionID = &lastPartitionID
+       b.specs = append(b.specs, *spec)
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if len(b.sortOrderList) == 0 {
+               return nil, errors.New("can't add snapshot with no added sort 
orders")

Review Comment:
   `UnsortedSortOrder` is the default returned if there's no sort order set. 
thus you can have an empty `sortOrderList` and `UnsortedSortOrder` will be 
returned as the default. there's no reason we shouldn't allow a snapshot with 
no sort order.



##########
table/metadata.go:
##########
@@ -80,20 +86,532 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a map of snapshot refs by name.
+       Refs() map[string]SnapshotRef
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() []SnapshotLogEntry
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() []MetadataLogEntry
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+       b.refs = metadata.Refs()
+       b.snapshotLog = metadata.SnapshotLogs()
+       b.metadataLog = metadata.PreviousFiles()
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+       }
+
+       var schemas []*iceberg.Schema
+       if initial {
+               schemas = []*iceberg.Schema{schema}
+       } else {
+               schemas = append(b.schemaList, schema)
+       }
+
+       b.lastColumnId = newLastColumnID
+       b.schemaList = schemas
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       var maxFieldID int
+       if len(spec.Fields()) > 0 {
+               maxField := slices.MaxFunc(spec.Fields(), func(a, b 
iceberg.PartitionField) int {
+                       return a.FieldID - b.FieldID
+               })
+               maxFieldID = maxField.FieldID
+       }
+
+       prev := PARTITION_FIELD_ID_START - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+       lastPartitionID := max(maxFieldID, prev)
+
+       var specs []iceberg.PartitionSpec
+       if initial {
+               specs = []iceberg.PartitionSpec{*spec}
+       } else {
+               specs = append(b.specs, *spec)
+       }
+
+       b.specs = specs
+       b.lastPartitionID = &lastPartitionID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if snapshot == nil {
+               return nil, nil
+       }
+
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if len(b.sortOrderList) == 0 {
+               return nil, errors.New("can't add snapshot with no added sort 
orders")
+       } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+               return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
+       } else if b.formatVersion == 2 &&
+               snapshot.SequenceNumber > 0 &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+               snapshot.ParentSnapshotID != nil {
+               return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
+                       snapshot.SequenceNumber, b.lastSequenceNumber)
+       }
+
+       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       b.lastUpdatedMS = snapshot.TimestampMs
+       b.lastSequenceNumber = &snapshot.SequenceNumber
+       b.snapshotList = append(b.snapshotList, *snapshot)
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) 
(*MetadataBuilder, error) {
+       var sortOrders []SortOrder
+       if initial {
+               sortOrders = []SortOrder{*sortOrder}
+       } else {
+               sortOrders = append(b.sortOrderList, *sortOrder)
+       }
+
+       b.sortOrderList = sortOrders
+       b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, 
error) {
+       if len(keys) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+       for _, key := range keys {
+               delete(b.props, key)
+       }
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) 
(*MetadataBuilder, error) {
+       if currentSchemaID == -1 {
+               currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) 
int {
+                       return s.ID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-schema" && 
u.(*AddSchemaUpdate).Schema.ID == currentSchemaID
+               }) {
+                       return nil, errors.New("can't set current schema to 
last added schema, no schema has been added")
+               }
+       }
+
+       if currentSchemaID == b.currentSchemaID {
+               return b, nil
+       }
+
+       _, err := b.GetSchemaByID(currentSchemaID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set current schema to schema with 
id %d: %w", currentSchemaID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetCurrentSchemaUpdate(currentSchemaID))
+       b.currentSchemaID = currentSchemaID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) 
(*MetadataBuilder, error) {
+       if defaultSortOrderID == -1 {
+               defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) 
int {
+                       return s.OrderID
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-sort-order" && 
u.(*AddSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID
+               }) {
+                       return nil, fmt.Errorf("can't set default sort order to 
last added with no added sort orders")
+               }
+       }
+
+       if defaultSortOrderID == b.defaultSortOrderID {
+               return b, nil
+       }
+
+       if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
+               return nil, fmt.Errorf("can't set default sort order to sort 
order with id %d: %w", defaultSortOrderID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+       b.defaultSortOrderID = defaultSortOrderID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) 
(*MetadataBuilder, error) {
+       if defaultSpecID == -1 {
+               defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) 
int {
+                       return s.ID()
+               })
+               if !containsBy(b.updates, func(u Update) bool {
+                       return u.Action() == "add-partition-spec" && 
u.(*AddPartitionSpecUpdate).Spec.ID() == defaultSpecID
+               }) {
+                       return nil, fmt.Errorf("can't set default spec to last 
added with no added partition specs")
+               }
+       }
+
+       if defaultSpecID == b.defaultSpecID {
+               return b, nil
+       }
+
+       if _, err := b.GetSpecByID(defaultSpecID); err != nil {
+               return nil, fmt.Errorf("can't set default spec to spec with id 
%d: %w", defaultSpecID, err)
+       }
+
+       b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+       b.defaultSpecID = defaultSpecID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetFormatVersion(formatVersion int) 
(*MetadataBuilder, error) {
+       if formatVersion < b.formatVersion {
+               return nil, fmt.Errorf("downgrading format version from %d to 
%d is not allowed",
+                       b.formatVersion, formatVersion)
+       }
+
+       if formatVersion > SUPPORTED_TABLE_FORMAT_VERSION {
+               return nil, fmt.Errorf("unsupported format version %d", 
formatVersion)
+       }
+
+       if formatVersion == b.formatVersion {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, 
NewUpgradeFormatVersionUpdate(formatVersion))
+       b.formatVersion = formatVersion
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) {
+       if b.loc == loc {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewSetLocationUpdate(loc))
+       b.loc = loc
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetProperties(props iceberg.Properties) 
(*MetadataBuilder, error) {
+       if len(props) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewSetPropertiesUpdate(props))
+       maps.Copy(b.props, props)
+       return b, nil
+}
+
+type setSnapshotRefOption func(*SnapshotRef) error
+
+func WithMaxRefAgeMs(maxRefAgeMs int64) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if maxRefAgeMs <= 0 {
+                       return fmt.Errorf("%w: maxRefAgeMs %d, must be > 0", 
iceberg.ErrInvalidArgument, maxRefAgeMs)
+               }
+               ref.MaxRefAgeMs = &maxRefAgeMs
+               return nil
+       }
+}
+
+func WithMaxSnapshotAgeMs(maxSnapshotAgeMs int64) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if maxSnapshotAgeMs <= 0 {
+                       return fmt.Errorf("%w: maxSnapshotAgeMs %d, must be > 
0", iceberg.ErrInvalidArgument, maxSnapshotAgeMs)
+               }
+               ref.MaxSnapshotAgeMs = &maxSnapshotAgeMs
+               return nil
+       }
+}
+
+func WithMinSnapshotsToKeep(minSnapshotsToKeep int) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if minSnapshotsToKeep <= 0 {
+                       return fmt.Errorf("%w: minSnapshotsToKeep %d, must be > 
0", iceberg.ErrInvalidArgument, minSnapshotsToKeep)
+               }
+               ref.MinSnapshotsToKeep = &minSnapshotsToKeep
+               return nil
+       }
+}
+
+func (b *MetadataBuilder) SetSnapshotRef(
+       name string,
+       snapshotID int64,
+       refType RefType,
+       options ...setSnapshotRefOption,
+) (*MetadataBuilder, error) {
+       ref := SnapshotRef{
+               SnapshotID:      snapshotID,
+               SnapshotRefType: refType,
+       }
+       for _, opt := range options {
+               if err := opt(&ref); err != nil {
+                       return nil, fmt.Errorf("invalid snapshot ref option: 
%w", err)
+               }
+       }
+
+       var maxRefAgeMs, maxSnapshotAgeMs int64
+       var minSnapshotsToKeep int
+       if ref.MaxRefAgeMs != nil {
+               maxRefAgeMs = *ref.MaxRefAgeMs
+       }
+       if ref.MaxSnapshotAgeMs != nil {
+               maxSnapshotAgeMs = *ref.MaxSnapshotAgeMs
+       }
+       if ref.MinSnapshotsToKeep != nil {
+               minSnapshotsToKeep = *ref.MinSnapshotsToKeep
+       }
+
+       if existingRef, ok := b.refs[name]; ok && existingRef.Equals(ref) {
+               return b, nil
+       }
+
+       snapshot, err := b.SnapshotByID(snapshotID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set snapshot ref %s to unknown 
snapshot %d: %w", name, snapshotID, err)
+       }
+
+       if refType == MainBranch {
+               b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, 
snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep))
+               b.currentSnapshotID = &snapshotID
+               b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{
+                       SnapshotID:  snapshotID,
+                       TimestampMs: snapshot.TimestampMs,
+               })
+               b.lastUpdatedMS = time.Now().Local().UnixMilli()
+       }
+
+       if containsBy(b.updates, func(u Update) bool {
+               return u.Action() == "add-snapshot" && 
u.(*AddSnapshotUpdate).Snapshot.SnapshotID == snapshotID
+       }) {
+               b.lastUpdatedMS = snapshot.TimestampMs
+       }
+
+       b.refs[name] = ref
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) {
+       if b.uuid == uuid {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewAssignUUIDUpdate(uuid))
+       b.uuid = uuid
+       return b, nil
+}
+
+func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
+       return &commonMetadata{
+               FormatVersion:      b.formatVersion,
+               UUID:               b.uuid,
+               Loc:                b.loc,
+               LastUpdatedMS:      b.lastUpdatedMS,
+               LastColumnId:       b.lastColumnId,
+               SchemaList:         b.schemaList,
+               CurrentSchemaID:    b.currentSchemaID,
+               Specs:              b.specs,
+               DefaultSpecID:      b.defaultSpecID,
+               LastPartitionID:    b.lastPartitionID,
+               Props:              b.props,
+               SnapshotList:       b.snapshotList,
+               CurrentSnapshotID:  b.currentSnapshotID,
+               SnapshotLog:        b.snapshotLog,
+               MetadataLog:        b.metadataLog,
+               SortOrderList:      b.sortOrderList,
+               DefaultSortOrderID: b.defaultSortOrderID,
+               SnapshotRefs:       b.refs,
+       }
+}
+
+func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
+       for _, s := range b.schemaList {
+               if s.ID == id {
+                       return s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("%w: schema with id %d not found", 
iceberg.ErrInvalidArgument, id)
+}
+
+func (b *MetadataBuilder) GetSpecByID(id int) (*iceberg.PartitionSpec, error) {
+       for _, s := range b.specs {
+               if s.ID() == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("partition spec with id %d not found", id)
+}
+
+func (b *MetadataBuilder) GetSortOrderByID(id int) (*SortOrder, error) {
+       for _, s := range b.sortOrderList {
+               if s.OrderID == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("sort order with id %d not found", id)
+}
+
+func (b *MetadataBuilder) SnapshotByID(id int64) (*Snapshot, error) {
+       for _, s := range b.snapshotList {
+               if s.SnapshotID == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("snapshot with id %d not found", id)
+}
+
+func (b *MetadataBuilder) Build() (Metadata, error) {
+       common := b.buildCommonMetadata()
+       switch b.formatVersion {
+       case 1:
+               schema, err := b.GetSchemaByID(b.currentSchemaID)
+               if err != nil {
+                       return nil, fmt.Errorf("can't build metadata, missing 
schema for schema ID %d: %w", b.currentSchemaID, err)
+               }
+               partition, err := b.GetSpecByID(b.defaultSpecID)
+               if err != nil {
+                       return nil, fmt.Errorf("can't build metadata, missing 
partition spec for spec ID %d: %w", b.defaultSpecID, err)
+               }
+               return &metadataV1{
+                       Schema:         schema,
+                       Partition:      partition.Fields(),
+                       commonMetadata: *common,
+               }, nil
+       case 2:
+               return &metadataV2{
+                       LastSequenceNumber: *b.lastSequenceNumber,
+                       commonMetadata:     *common,
+               }, nil
+       default:
+               panic("unreachable: invalid format version")
+       }
+}
+
+// containsBy returns true if found(e) is true for any e in elems.
+func containsBy[S []E, E any](elems S, found func(e E) bool) bool {
+       for _, e := range elems {
+               if found(e) {
+                       return true
+               }
+       }
+       return false
+}
+
+// maxBy returns the maximum value of extract(e) for all e in elems.
+// If elems is empty, returns 0.
+func maxBy[S []E, E any](elems S, extract func(e E) int) int {

Review Comment:
   use `~[]E` for better coverage of generics and future proofing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to