zeroshade commented on code in PR #354: URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2006488345
########## table/transaction.go: ########## @@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, snapshotProps iceberg.Prope return iceberg.ErrNotImplemented } +type MergeOp struct { + OutputFile string + InputFiles []string +} + +func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps iceberg.Properties) error { + var ( + inputFiles = make(map[string]bool) + outputFiles = make(map[string]struct{}) + numInputFiles int + numOutputFiles int + ) + + for _, op := range ops { + if len(op.InputFiles) <= 1 { + return fmt.Errorf("merge operation must have at least 2 input files (%d)", len(op.InputFiles)) + } + + outputFiles[op.OutputFile] = struct{}{} + numOutputFiles++ + + for _, f := range op.InputFiles { + inputFiles[f] = false + numInputFiles++ + } + } + + if len(outputFiles) != numOutputFiles { + return errors.New("duplicate output files") + } + + if len(inputFiles) != numInputFiles { + return errors.New("duplicate input files") + } + + s := t.meta.currentSnapshot() + + if s == nil { + return errors.New("merge operation requires an existing snapshot") + } + + manifestFiles, err := s.Manifests(t.tbl.fs) + if err != nil { + return err + } + + var ( + existingManifestFiles []iceberg.ManifestFile // manifest that don't contain any input files + existingDataFiles []iceberg.DataFile // existing data file entries in manifest that contain some input files + deletedManifestEntries []iceberg.ManifestEntry + deletedManifestFiles []iceberg.ManifestFile + ) + + for _, manifestFile := range manifestFiles { + entries, err := manifestFile.FetchEntries(t.tbl.fs, false) + if err != nil { + return err + } + + var ( + isManifestFileTouched bool + untouchedDataFiles []iceberg.DataFile + ) + + for _, entry := range entries { + entry.Status() Review Comment: this line does nothing? Is something missing? ########## table/transaction.go: ########## @@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, snapshotProps iceberg.Prope return iceberg.ErrNotImplemented } +type MergeOp struct { + OutputFile string + InputFiles []string +} + +func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps iceberg.Properties) error { + var ( + inputFiles = make(map[string]bool) + outputFiles = make(map[string]struct{}) + numInputFiles int + numOutputFiles int + ) + + for _, op := range ops { + if len(op.InputFiles) <= 1 { + return fmt.Errorf("merge operation must have at least 2 input files (%d)", len(op.InputFiles)) + } + + outputFiles[op.OutputFile] = struct{}{} + numOutputFiles++ + + for _, f := range op.InputFiles { + inputFiles[f] = false + numInputFiles++ + } + } + + if len(outputFiles) != numOutputFiles { + return errors.New("duplicate output files") + } + + if len(inputFiles) != numInputFiles { + return errors.New("duplicate input files") + } + + s := t.meta.currentSnapshot() + + if s == nil { + return errors.New("merge operation requires an existing snapshot") + } + + manifestFiles, err := s.Manifests(t.tbl.fs) + if err != nil { + return err + } + + var ( + existingManifestFiles []iceberg.ManifestFile // manifest that don't contain any input files + existingDataFiles []iceberg.DataFile // existing data file entries in manifest that contain some input files + deletedManifestEntries []iceberg.ManifestEntry + deletedManifestFiles []iceberg.ManifestFile + ) + + for _, manifestFile := range manifestFiles { + entries, err := manifestFile.FetchEntries(t.tbl.fs, false) + if err != nil { + return err + } + + var ( + isManifestFileTouched bool + untouchedDataFiles []iceberg.DataFile + ) + + for _, entry := range entries { + entry.Status() + + _, found := inputFiles[entry.DataFile().FilePath()] + + if !found { + untouchedDataFiles = append(untouchedDataFiles, entry.DataFile()) + } else { + inputFiles[entry.DataFile().FilePath()] = true + isManifestFileTouched = true + + deletedManifestEntries = append(deletedManifestEntries, iceberg.NewManifestEntry( + iceberg.EntryStatusDELETED, + internal.ToPtr(entry.SnapshotID()), + internal.ToPtr(entry.SequenceNum()), + entry.FileSequenceNum(), + entry.DataFile(), + )) + } + } + + if !isManifestFileTouched { + existingManifestFiles = append(existingManifestFiles, manifestFile) + } else { + deletedManifestFiles = append(deletedManifestFiles, manifestFile) + existingDataFiles = append(existingDataFiles, untouchedDataFiles...) + + } + } + + for f, found := range inputFiles { + if !found { + return fmt.Errorf("input file %s not found in any manifest", f) + } + } + + if t.meta.NameMapping() == nil { + nameMapping := t.meta.CurrentSchema().NameMapping() + mappingJson, err := json.Marshal(nameMapping) + if err != nil { + return err + } + err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)}) + if err != nil { + return err + } + } + + updater := t.updateSnapshot(snapshotProps).merge(existingManifestFiles, deletedManifestFiles, deletedManifestEntries) + + outputDataFiles := parquetFilesToDataFiles(t.tbl.fs, t.meta, maps.Keys(outputFiles)) Review Comment: why are we re-processing all of the files? We have manifest entries for them already, we don't need to re-read and process them to create new entries. We can just use the existing entries, right? ########## table/snapshot_producers.go: ########## @@ -92,6 +92,65 @@ func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { return nil, nil } +func newMergeFilesProducer( + op Operation, + txn *Transaction, + fs iceio.WriteFileIO, + commitUUID *uuid.UUID, + snapshotProps iceberg.Properties, + existingManifestFiles []iceberg.ManifestFile, + deletedManifestFiles []iceberg.ManifestFile, + deletedManifestEntries []iceberg.ManifestEntry, +) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &mergeFiles{ + base: prod, + existingManifestFiles: existingManifestFiles, + deletedManifestFiles: deletedManifestFiles, + deletedManifestEntries: deletedManifestEntries, + } + + return prod +} + +type mergeFiles struct { + base *snapshotProducer + + existingManifestFiles []iceberg.ManifestFile + deletedManifestFiles []iceberg.ManifestFile + deletedManifestEntries []iceberg.ManifestEntry +} + +func (mf *mergeFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + var manifestsToKeep []iceberg.ManifestFile + + for _, manifest := range manifests { + var found bool + + for _, deletedManifest := range mf.deletedManifestFiles { + if manifest.FilePath() == deletedManifest.FilePath() { + found = true + + break + } + } + + if !found { + manifestsToKeep = append(manifestsToKeep, manifest) + } + } Review Comment: instead of spinning through multiple slices and having to pass in the original set of deleted manifests, why not do: ```go unmergedDataManifests, unmergedDeleteManifests := []iceberg.ManifestFile{}, []iceberg.ManifestFile{} for _, m := range manifests { switch m.ManifestContent() { case iceberg.ManifestContentData: unmergedDataManifests = append(unmergedDataManifests, m) case iceberg.ManifestContentDeletes: unmergedDeleteManifests = append(unmergedDeleteManifests, m) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org