arnaudbriche commented on code in PR #354: URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2007259597
########## table/transaction.go: ########## @@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, snapshotProps iceberg.Prope return iceberg.ErrNotImplemented } +type MergeOp struct { + OutputFile string + InputFiles []string +} + +func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps iceberg.Properties) error { + var ( + inputFiles = make(map[string]bool) + outputFiles = make(map[string]struct{}) + numInputFiles int + numOutputFiles int + ) + + for _, op := range ops { + if len(op.InputFiles) <= 1 { + return fmt.Errorf("merge operation must have at least 2 input files (%d)", len(op.InputFiles)) + } + + outputFiles[op.OutputFile] = struct{}{} + numOutputFiles++ + + for _, f := range op.InputFiles { + inputFiles[f] = false + numInputFiles++ + } + } + + if len(outputFiles) != numOutputFiles { + return errors.New("duplicate output files") + } + + if len(inputFiles) != numInputFiles { + return errors.New("duplicate input files") + } + + s := t.meta.currentSnapshot() + + if s == nil { + return errors.New("merge operation requires an existing snapshot") + } + + manifestFiles, err := s.Manifests(t.tbl.fs) + if err != nil { + return err + } + + var ( + existingManifestFiles []iceberg.ManifestFile // manifest that don't contain any input files + existingDataFiles []iceberg.DataFile // existing data file entries in manifest that contain some input files + deletedManifestEntries []iceberg.ManifestEntry + deletedManifestFiles []iceberg.ManifestFile + ) + + for _, manifestFile := range manifestFiles { + entries, err := manifestFile.FetchEntries(t.tbl.fs, false) + if err != nil { + return err + } + + var ( + isManifestFileTouched bool + untouchedDataFiles []iceberg.DataFile + ) + + for _, entry := range entries { + entry.Status() Review Comment: Weird indeed. ########## table/snapshot_producers.go: ########## @@ -92,6 +92,65 @@ func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { return nil, nil } +func newMergeFilesProducer( + op Operation, + txn *Transaction, + fs iceio.WriteFileIO, + commitUUID *uuid.UUID, + snapshotProps iceberg.Properties, + existingManifestFiles []iceberg.ManifestFile, + deletedManifestFiles []iceberg.ManifestFile, + deletedManifestEntries []iceberg.ManifestEntry, +) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &mergeFiles{ + base: prod, + existingManifestFiles: existingManifestFiles, + deletedManifestFiles: deletedManifestFiles, + deletedManifestEntries: deletedManifestEntries, + } + + return prod +} + +type mergeFiles struct { + base *snapshotProducer + + existingManifestFiles []iceberg.ManifestFile + deletedManifestFiles []iceberg.ManifestFile + deletedManifestEntries []iceberg.ManifestEntry +} + +func (mf *mergeFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + var manifestsToKeep []iceberg.ManifestFile + + for _, manifest := range manifests { + var found bool + + for _, deletedManifest := range mf.deletedManifestFiles { + if manifest.FilePath() == deletedManifest.FilePath() { + found = true + + break + } + } + + if !found { + manifestsToKeep = append(manifestsToKeep, manifest) + } + } Review Comment: Ok, I'm a bit confused now; I was pretty sure `ManifestContentDeletes` refers to parquet files containing deleted row masks, not a manifest containing a list of deletes files. I hacked this together without really understanding the way `producerImpl` interface is meant to be used, and I think it would be really beneficial to get some intel from you. Here is how I understood it: ```go type producerImpl interface { // called with the full list of manifest for the snapshot at the end of the process // it is here that I am supposed to get rid of the manifest files containing some merge input files ? processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) // why would it differs by implementation of `producerImpl` ? // isn't it supposed to be the list of manifest for the parent snapshot every time ? existingManifests() ([]iceberg.ManifestFile, error) // used to maintain table statistics but not to remove manifest containing those entries ? deletedEntries() ([]iceberg.ManifestEntry, error) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org