zeroshade commented on code in PR #146: URL: https://github.com/apache/iceberg-go/pull/146#discussion_r1882697400
########## table/metadata.go: ########## @@ -80,20 +92,544 @@ type Metadata interface { SnapshotByName(name string) *Snapshot // CurrentSnapshot returns the table's current snapshot. CurrentSnapshot() *Snapshot + // Ref returns the snapshot ref for the main branch. + Ref() SnapshotRef + // Refs returns a map of snapshot refs by name. + Refs() iter.Seq2[string, SnapshotRef] Review Comment: We should probably update the comment since it's returning a *list* of name/snapshot ref pairs now, not a map ########## table/metadata.go: ########## @@ -80,20 +92,544 @@ type Metadata interface { SnapshotByName(name string) *Snapshot // CurrentSnapshot returns the table's current snapshot. CurrentSnapshot() *Snapshot + // Ref returns the snapshot ref for the main branch. + Ref() SnapshotRef + // Refs returns a map of snapshot refs by name. + Refs() iter.Seq2[string, SnapshotRef] + // SnapshotLogs returns the list of snapshot logs for the table. + SnapshotLogs() iter.Seq[SnapshotLogEntry] // SortOrder returns the table's current sort order, ie: the one with the // ID that matches the default-sort-order-id. SortOrder() SortOrder // SortOrders returns the list of sort orders in the table. SortOrders() []SortOrder + // DefaultSortOrder returns the ID of the current sort order that writers + // should use by default. + DefaultSortOrder() int // Properties is a string to string map of table properties. This is used // to control settings that affect reading and writing and is not intended // to be used for arbitrary metadata. For example, commit.retry.num-retries // is used to control the number of commit retries. Properties() iceberg.Properties + // PreviousFiles returns the list of metadata log entries for the table. + PreviousFiles() iter.Seq[MetadataLogEntry] Equals(Metadata) bool } +type MetadataBuilder struct { + base Metadata + updates []Update + + // common fields + formatVersion int + uuid uuid.UUID + loc string + lastUpdatedMS int64 + lastColumnId int + schemaList []*iceberg.Schema + currentSchemaID int + specs []iceberg.PartitionSpec + defaultSpecID int + lastPartitionID *int + props iceberg.Properties + snapshotList []Snapshot + currentSnapshotID *int64 + snapshotLog []SnapshotLogEntry + metadataLog []MetadataLogEntry + sortOrderList []SortOrder + defaultSortOrderID int + refs map[string]SnapshotRef + + // V2 specific + lastSequenceNumber *int64 +} + +func NewMetadataBuilder() (*MetadataBuilder, error) { + return &MetadataBuilder{ + updates: make([]Update, 0), + schemaList: make([]*iceberg.Schema, 0), + specs: make([]iceberg.PartitionSpec, 0), + props: make(iceberg.Properties), + snapshotList: make([]Snapshot, 0), + snapshotLog: make([]SnapshotLogEntry, 0), + metadataLog: make([]MetadataLogEntry, 0), + sortOrderList: make([]SortOrder, 0), + refs: make(map[string]SnapshotRef), + }, nil +} + +func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { + b := &MetadataBuilder{} + b.base = metadata + + b.formatVersion = metadata.Version() + b.uuid = metadata.TableUUID() + b.loc = metadata.Location() + b.lastUpdatedMS = metadata.LastUpdatedMillis() + b.lastColumnId = metadata.LastColumnID() + b.schemaList = metadata.Schemas() + b.currentSchemaID = metadata.CurrentSchema().ID + b.specs = metadata.PartitionSpecs() + b.defaultSpecID = metadata.DefaultPartitionSpec() + b.lastPartitionID = metadata.LastPartitionSpecID() + b.props = metadata.Properties() + b.snapshotList = metadata.Snapshots() + b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID + b.sortOrderList = metadata.SortOrders() + b.defaultSortOrderID = metadata.DefaultSortOrder() + + b.refs = make(map[string]SnapshotRef) + for name, ref := range metadata.Refs() { + b.refs[name] = ref + } + + b.snapshotLog = make([]SnapshotLogEntry, 0) + for log := range metadata.SnapshotLogs() { + b.snapshotLog = append(b.snapshotLog, log) + } + + b.metadataLog = make([]MetadataLogEntry, 0) + for entry := range metadata.PreviousFiles() { + b.metadataLog = append(b.metadataLog, entry) + } + + return b, nil +} + +func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID int, initial bool) (*MetadataBuilder, error) { + if newLastColumnID < b.lastColumnId { + return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId) + } + + var schemas []*iceberg.Schema + if initial { + schemas = []*iceberg.Schema{schema} + } else { + schemas = append(b.schemaList, schema) + } + + b.lastColumnId = newLastColumnID + b.schemaList = schemas + b.updates = append(b.updates, NewAddSchemaUpdate(schema, newLastColumnID, initial)) + + return b, nil +} + +func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, initial bool) (*MetadataBuilder, error) { + for _, s := range b.specs { + if s.ID() == spec.ID() && !initial { + return nil, fmt.Errorf("partition spec with id %d already exists", spec.ID()) + } + } + + maxFieldID := 0 + for f := range spec.Fields() { + maxFieldID = max(maxFieldID, f.FieldID) + } + + prev := partitionFieldStartID - 1 + if b.lastPartitionID != nil { + prev = *b.lastPartitionID + } + lastPartitionID := max(maxFieldID, prev) + + var specs []iceberg.PartitionSpec + if initial { + specs = []iceberg.PartitionSpec{*spec} + } else { + specs = append(b.specs, *spec) + } + + b.specs = specs + b.lastPartitionID = &lastPartitionID + b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial)) + + return b, nil +} + +func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, error) { + if snapshot == nil { + return nil, nil + } + + if len(b.schemaList) == 0 { + return nil, errors.New("can't add snapshot with no added schemas") + } else if len(b.specs) == 0 { + return nil, errors.New("can't add snapshot with no added partition specs") + } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil { + return nil, fmt.Errorf("can't add snapshot with id %d, already exists", snapshot.SnapshotID) + } else if b.formatVersion == 2 && + snapshot.SequenceNumber > 0 && + snapshot.SequenceNumber <= *b.lastSequenceNumber && + snapshot.ParentSnapshotID != nil { + return nil, fmt.Errorf("can't add snapshot with sequence number %d, must be > than last sequence number %d", + snapshot.SequenceNumber, b.lastSequenceNumber) + } + + b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot)) + b.lastUpdatedMS = snapshot.TimestampMs + b.lastSequenceNumber = &snapshot.SequenceNumber + b.snapshotList = append(b.snapshotList, *snapshot) + return b, nil +} + +func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) (*MetadataBuilder, error) { + for _, s := range b.sortOrderList { + if s.OrderID == sortOrder.OrderID && !initial { + return nil, fmt.Errorf("sort order with id %d already exists", sortOrder.OrderID) + } + } Review Comment: if `initial == true` then we can skip this entire loop, right? should we shift the `if initial` case up higher and put this loop only in the `else` case? ########## table/metadata.go: ########## @@ -370,26 +928,34 @@ func (c *commonMetadata) validate() error { func (c *commonMetadata) Version() int { return c.FormatVersion } -type MetadataV1 struct { - Schema iceberg.Schema `json:"schema"` +type metadataV1 struct { + Schema *iceberg.Schema `json:"schema"` Partition []iceberg.PartitionField `json:"partition-spec"` commonMetadata } -func (m *MetadataV1) Equals(other Metadata) bool { - rhs, ok := other.(*MetadataV1) +func (m *metadataV1) Equals(other Metadata) bool { + rhs, ok := other.(*metadataV1) if !ok { return false } - return m.Schema.Equals(&rhs.Schema) && slices.Equal(m.Partition, rhs.Partition) && + if m == rhs { + return true + } + + if m == nil || rhs == nil { + return false + } Review Comment: what scenario can this happen in? ########## table/metadata.go: ########## @@ -428,34 +994,42 @@ func (m *MetadataV1) UnmarshalJSON(b []byte) error { return m.validate() } -func (m *MetadataV1) ToV2() MetadataV2 { +func (m *metadataV1) ToV2() metadataV2 { commonOut := m.commonMetadata commonOut.FormatVersion = 2 if commonOut.UUID.String() == "" { commonOut.UUID = uuid.New() } - return MetadataV2{commonMetadata: commonOut} + return metadataV2{commonMetadata: commonOut} } -type MetadataV2 struct { - LastSequenceNumber int `json:"last-sequence-number"` +type metadataV2 struct { + LastSequenceNumber int64 `json:"last-sequence-number"` commonMetadata } -func (m *MetadataV2) Equals(other Metadata) bool { - rhs, ok := other.(*MetadataV2) +func (m *metadataV2) Equals(other Metadata) bool { + rhs, ok := other.(*metadataV2) if !ok { return false } + if m == rhs { + return true + } + + if m == nil || rhs == nil { + return false + } Review Comment: can we comment what scenario this could happen in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org