jwtryg commented on code in PR #146:
URL: https://github.com/apache/iceberg-go/pull/146#discussion_r1754963661
##########
table/metadata.go:
##########
@@ -80,20 +86,512 @@ type Metadata interface {
SnapshotByName(name string) *Snapshot
// CurrentSnapshot returns the table's current snapshot.
CurrentSnapshot() *Snapshot
+ // Ref returns the snapshot ref for the main branch.
+ Ref() SnapshotRef
+ // Refs returns a map of snapshot refs by name.
+ Refs() map[string]SnapshotRef
+ // SnapshotLogs returns the list of snapshot logs for the table.
+ SnapshotLogs() []SnapshotLogEntry
// SortOrder returns the table's current sort order, ie: the one with
the
// ID that matches the default-sort-order-id.
SortOrder() SortOrder
// SortOrders returns the list of sort orders in the table.
SortOrders() []SortOrder
+ // DefaultSortOrder returns the ID of the current sort order that
writers
+ // should use by default.
+ DefaultSortOrder() int
// Properties is a string to string map of table properties. This is
used
// to control settings that affect reading and writing and is not
intended
// to be used for arbitrary metadata. For example,
commit.retry.num-retries
// is used to control the number of commit retries.
Properties() iceberg.Properties
+ // PreviousFiles returns the list of metadata log entries for the table.
+ PreviousFiles() []MetadataLogEntry
Equals(Metadata) bool
}
+type MetadataBuilder struct {
+ base Metadata
+ updates []Update
+
+ // common fields
+ formatVersion int
+ uuid uuid.UUID
+ loc string
+ lastUpdatedMS int64
+ lastColumnId int
+ schemaList []*iceberg.Schema
+ currentSchemaID int
+ specs []iceberg.PartitionSpec
+ defaultSpecID int
+ lastPartitionID *int
+ props iceberg.Properties
+ snapshotList []Snapshot
+ currentSnapshotID *int64
+ snapshotLog []SnapshotLogEntry
+ metadataLog []MetadataLogEntry
+ sortOrderList []SortOrder
+ defaultSortOrderID int
+ refs map[string]SnapshotRef
+
+ // V2 specific
+ lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+ return &MetadataBuilder{
+ updates: make([]Update, 0),
+ schemaList: make([]*iceberg.Schema, 0),
+ specs: make([]iceberg.PartitionSpec, 0),
+ props: make(iceberg.Properties),
+ snapshotList: make([]Snapshot, 0),
+ snapshotLog: make([]SnapshotLogEntry, 0),
+ metadataLog: make([]MetadataLogEntry, 0),
+ sortOrderList: make([]SortOrder, 0),
+ refs: make(map[string]SnapshotRef),
+ }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+ b := &MetadataBuilder{}
+ b.base = metadata
+
+ b.formatVersion = metadata.Version()
+ b.uuid = metadata.TableUUID()
+ b.loc = metadata.Location()
+ b.lastUpdatedMS = metadata.LastUpdatedMillis()
+ b.lastColumnId = metadata.LastColumnID()
+ b.schemaList = metadata.Schemas()
+ b.currentSchemaID = metadata.CurrentSchema().ID
+ b.specs = metadata.PartitionSpecs()
+ b.defaultSpecID = metadata.DefaultPartitionSpec()
+ b.lastPartitionID = metadata.LastPartitionSpecID()
+ b.props = metadata.Properties()
+ b.snapshotList = metadata.Snapshots()
+ b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+ b.sortOrderList = metadata.SortOrders()
+ b.defaultSortOrderID = metadata.DefaultSortOrder()
+ b.refs = metadata.Refs()
+ b.snapshotLog = metadata.SnapshotLogs()
+ b.metadataLog = metadata.PreviousFiles()
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID
int) (*MetadataBuilder, error) {
+ if newLastColumnID < b.lastColumnId {
+ return nil, fmt.Errorf("invalid last column id %d, must be >=
%d",
+ newLastColumnID, b.lastColumnId)
+ }
+
+ b.updates = append(b.updates, NewAddSchemaUpdate(schema,
newLastColumnID))
+ b.lastColumnId = newLastColumnID
+ b.schemaList = append(b.schemaList, schema)
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec,
initial bool) (*MetadataBuilder, error) {
+ for _, s := range b.specs {
+ if s.ID() == spec.ID() && !initial {
+ return nil, fmt.Errorf("partition spec with id %d
already exists", spec.ID())
+ }
+ }
+
+ var maxFieldID int
+ if len(spec.Fields()) > 0 {
+ maxField := slices.MaxFunc(spec.Fields(), func(a, b
iceberg.PartitionField) int {
+ return a.FieldID - b.FieldID
+ })
+ maxFieldID = maxField.FieldID
+ }
+
+ prev := PARTITION_FIELD_ID_START - 1
+ if b.lastPartitionID != nil {
+ prev = *b.lastPartitionID
+ }
+
+ lastPartitionID := max(maxFieldID, prev)
+ b.lastPartitionID = &lastPartitionID
+ b.specs = append(b.specs, *spec)
+ b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder,
error) {
+ if len(b.schemaList) == 0 {
+ return nil, errors.New("can't add snapshot with no added
schemas")
+ } else if len(b.specs) == 0 {
+ return nil, errors.New("can't add snapshot with no added
partition specs")
+ } else if len(b.sortOrderList) == 0 {
+ return nil, errors.New("can't add snapshot with no added sort
orders")
+ } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+ return nil, fmt.Errorf("can't add snapshot with id %d, already
exists", snapshot.SnapshotID)
+ } else if b.formatVersion == 2 &&
+ snapshot.SequenceNumber > 0 &&
+ snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+ snapshot.ParentSnapshotID != nil {
+ return nil, fmt.Errorf("can't add snapshot with sequence number
%d, must be > than last sequence number %d",
+ snapshot.SequenceNumber, b.lastSequenceNumber)
+ }
+
+ b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+ b.lastUpdatedMS = snapshot.TimestampMs
+ b.lastSequenceNumber = &snapshot.SequenceNumber
+ b.snapshotList = append(b.snapshotList, *snapshot)
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder)
(*MetadataBuilder, error) {
+ b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder))
+ b.sortOrderList = append(b.sortOrderList, *sortOrder)
+ return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder,
error) {
+ if len(keys) == 0 {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+ for _, key := range keys {
+ delete(b.props, key)
+ }
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int)
(*MetadataBuilder, error) {
+ if currentSchemaID == -1 {
+ currentSchemaID = b.MaxSchemaID()
+ if !b.isAddedSchemaID(currentSchemaID) {
+ return nil, errors.New("can't set current schema to
last added schema, no schema has been added")
+ }
+ }
+
+ if currentSchemaID == b.currentSchemaID {
+ return b, nil
+ }
+
+ _, err := b.GetSchemaByID(currentSchemaID)
+ if err != nil {
+ return nil, fmt.Errorf("can't set current schema to schema with
id %d: %w", currentSchemaID, err)
+ }
+
+ b.updates = append(b.updates,
NewSetCurrentSchemaUpdate(currentSchemaID))
+ b.currentSchemaID = currentSchemaID
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int)
(*MetadataBuilder, error) {
+ if defaultSortOrderID == -1 {
+ defaultSortOrderID = b.MaxSortOrderID()
+ if !b.isAddedSortOrder(defaultSortOrderID) {
+ return nil, fmt.Errorf("can't set default sort order to
last added with no added sort orders")
+ }
+ }
+
+ if defaultSortOrderID == b.defaultSortOrderID {
+ return b, nil
+ }
+
+ if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
+ return nil, fmt.Errorf("can't set default sort order to sort
order with id %d: %w", defaultSortOrderID, err)
+ }
+
+ b.updates = append(b.updates,
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+ b.defaultSortOrderID = defaultSortOrderID
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int)
(*MetadataBuilder, error) {
+ if defaultSpecID == -1 {
+ defaultSpecID = b.MaxSpecID()
+ if !b.isAddedSpecID(defaultSpecID) {
+ return nil, fmt.Errorf("can't set default spec to last
added with no added partition specs")
+ }
+ }
+
+ if defaultSpecID == b.defaultSpecID {
+ return b, nil
+ }
+
+ if _, err := b.GetSpecByID(defaultSpecID); err != nil {
+ return nil, fmt.Errorf("can't set default spec to spec with id
%d: %w", defaultSpecID, err)
+ }
+
+ b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+ b.defaultSpecID = defaultSpecID
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetFormatVersion(formatVersion int)
(*MetadataBuilder, error) {
+ if formatVersion < b.formatVersion {
+ return nil, fmt.Errorf("downgrading format version from %d to
%d is not allowed",
+ b.formatVersion, formatVersion)
+ }
+
+ if formatVersion > SUPPORTED_TABLE_FORMAT_VERSION {
+ return nil, fmt.Errorf("unsupported format version %d",
formatVersion)
+ }
+
+ if formatVersion == b.formatVersion {
+ return b, nil
+ }
+
+ b.updates = append(b.updates,
NewUpgradeFormatVersionUpdate(formatVersion))
+ b.formatVersion = formatVersion
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) {
+ if b.loc == loc {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewSetLocationUpdate(loc))
+ b.loc = loc
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetProperties(props iceberg.Properties)
(*MetadataBuilder, error) {
+ if len(props) == 0 {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewSetPropertiesUpdate(props))
+ maps.Copy(b.props, props)
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetSnapshotRef(
+ name string,
+ snapshotID int64,
+ refType RefType,
+ maxRefAgeMs, maxSnapshotAgeMs *int64,
Review Comment:
Updated the logic here to accept options.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]