arnaudbriche commented on code in PR #354: URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2007259086
########## table/transaction.go: ########## @@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, snapshotProps iceberg.Prope return iceberg.ErrNotImplemented } +type MergeOp struct { + OutputFile string + InputFiles []string +} + +func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps iceberg.Properties) error { + var ( + inputFiles = make(map[string]bool) + outputFiles = make(map[string]struct{}) + numInputFiles int + numOutputFiles int + ) + + for _, op := range ops { + if len(op.InputFiles) <= 1 { + return fmt.Errorf("merge operation must have at least 2 input files (%d)", len(op.InputFiles)) + } + + outputFiles[op.OutputFile] = struct{}{} + numOutputFiles++ + + for _, f := range op.InputFiles { + inputFiles[f] = false + numInputFiles++ + } + } + + if len(outputFiles) != numOutputFiles { + return errors.New("duplicate output files") + } + + if len(inputFiles) != numInputFiles { + return errors.New("duplicate input files") + } + + s := t.meta.currentSnapshot() + + if s == nil { + return errors.New("merge operation requires an existing snapshot") + } + + manifestFiles, err := s.Manifests(t.tbl.fs) + if err != nil { + return err + } + + var ( + existingManifestFiles []iceberg.ManifestFile // manifest that don't contain any input files + existingDataFiles []iceberg.DataFile // existing data file entries in manifest that contain some input files + deletedManifestEntries []iceberg.ManifestEntry + deletedManifestFiles []iceberg.ManifestFile + ) + + for _, manifestFile := range manifestFiles { + entries, err := manifestFile.FetchEntries(t.tbl.fs, false) + if err != nil { + return err + } + + var ( + isManifestFileTouched bool + untouchedDataFiles []iceberg.DataFile + ) + + for _, entry := range entries { + entry.Status() + + _, found := inputFiles[entry.DataFile().FilePath()] + + if !found { + untouchedDataFiles = append(untouchedDataFiles, entry.DataFile()) + } else { + inputFiles[entry.DataFile().FilePath()] = true + isManifestFileTouched = true + + deletedManifestEntries = append(deletedManifestEntries, iceberg.NewManifestEntry( + iceberg.EntryStatusDELETED, + internal.ToPtr(entry.SnapshotID()), + internal.ToPtr(entry.SequenceNum()), + entry.FileSequenceNum(), + entry.DataFile(), + )) + } + } + + if !isManifestFileTouched { + existingManifestFiles = append(existingManifestFiles, manifestFile) + } else { + deletedManifestFiles = append(deletedManifestFiles, manifestFile) + existingDataFiles = append(existingDataFiles, untouchedDataFiles...) + + } + } + + for f, found := range inputFiles { + if !found { + return fmt.Errorf("input file %s not found in any manifest", f) + } + } + + if t.meta.NameMapping() == nil { + nameMapping := t.meta.CurrentSchema().NameMapping() + mappingJson, err := json.Marshal(nameMapping) + if err != nil { + return err + } + err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: string(mappingJson)}) + if err != nil { + return err + } + } + + updater := t.updateSnapshot(snapshotProps).merge(existingManifestFiles, deletedManifestFiles, deletedManifestEntries) + + outputDataFiles := parquetFilesToDataFiles(t.tbl.fs, t.meta, maps.Keys(outputFiles)) Review Comment: I don't understand whar you mean here. `outputFiles` is a list of newly created Parquet files resulting from the merge operation, so we don't have existing entries. Maybe you are talking about some else ? I can hook it up to have the working feature then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org