zeroshade commented on code in PR #356: URL: https://github.com/apache/iceberg-go/pull/356#discussion_r2019218246
########## table/snapshot_producers.go: ########## @@ -89,9 +97,147 @@ func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) { } func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + // for fast appends, there are no deleted entries return nil, nil } +type overwriteFiles struct { + base *snapshotProducer +} + +func newOverwriteFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &overwriteFiles{base: prod} + + return prod +} + +func (of *overwriteFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + // no post processing + return manifests, nil +} + +func (of *overwriteFiles) existingManifests() ([]iceberg.ManifestFile, error) { + // determine if there are any existing manifest files + existingFiles := make([]iceberg.ManifestFile, 0) + + snap := of.base.txn.meta.currentSnapshot() + if snap == nil { + return existingFiles, nil + } + + manifestList, err := snap.Manifests(of.base.io) + if err != nil { + return existingFiles, err + } + + for _, m := range manifestList { + entries, err := of.base.fetchManifestEntry(m, true) + if err != nil { + return existingFiles, err + } + + foundDeleted := make([]iceberg.ManifestEntry, 0) + notDeleted := make([]iceberg.ManifestEntry, 0, len(entries)) + for _, entry := range entries { + if _, ok := of.base.deletedFiles[entry.DataFile().FilePath()]; ok { + foundDeleted = append(foundDeleted, entry) + } else { + notDeleted = append(notDeleted, entry) + } + } + + if len(foundDeleted) == 0 { + existingFiles = append(existingFiles, m) + + continue + } + + if len(notDeleted) == 0 { + continue + } + + spec, err := of.base.txn.meta.GetSpecByID(int(m.PartitionSpecID())) + if err != nil { + return existingFiles, err + } + + wr, path, counter, err := of.base.newManifestWriter(*spec) + if err != nil { + return existingFiles, err + } + defer counter.W.(io.Closer).Close() + + for _, entry := range notDeleted { + if err := wr.Existing(entry); err != nil { + return existingFiles, err + } + } + + mf, err := wr.ToManifestFile(path, counter.Count) + if err != nil { + return existingFiles, err + } + + existingFiles = append(existingFiles, mf) + } + + return existingFiles, nil +} + +func (of *overwriteFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + // determine if we need to record any deleted entries + // + // with a full overwrite all the entries are considered deleted + // with partial overwrites we have to use the predicate to evaluate + // which entries are affected + if of.base.parentSnapshotID <= 0 { + return nil, nil + } + + parent, err := of.base.txn.meta.SnapshotByID(of.base.parentSnapshotID) + if err != nil { + return nil, fmt.Errorf("%w: cannot overwrite empty table", err) + } + + previousManifests, err := parent.Manifests(of.base.io) + if err != nil { + return nil, err + } + + getEntries := func(m iceberg.ManifestFile) ([]iceberg.ManifestEntry, error) { + entries, err := of.base.fetchManifestEntry(m, true) + if err != nil { + return nil, err + } + + result := make([]iceberg.ManifestEntry, 0, len(entries)) + for _, entry := range entries { + _, ok := of.base.deletedFiles[entry.DataFile().FilePath()] + if ok && entry.DataFile().ContentType() == iceberg.EntryContentData { + seqNum := entry.SequenceNum() + result = append(result, + iceberg.NewManifestEntry(iceberg.EntryStatusDELETED, + &of.base.snapshotID, &seqNum, entry.FileSequenceNum(), + entry.DataFile())) + } + } + + return result, nil + } + + nWorkers := 5 Review Comment: gah, i forgot that i wanted this to be a config var. I should set that up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org