arnaudbriche commented on code in PR #354:
URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2007259597


##########
table/transaction.go:
##########
@@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, 
snapshotProps iceberg.Prope
        return iceberg.ErrNotImplemented
 }
 
+type MergeOp struct {
+       OutputFile string
+       InputFiles []string
+}
+
+func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps 
iceberg.Properties) error {
+       var (
+               inputFiles     = make(map[string]bool)
+               outputFiles    = make(map[string]struct{})
+               numInputFiles  int
+               numOutputFiles int
+       )
+
+       for _, op := range ops {
+               if len(op.InputFiles) <= 1 {
+                       return fmt.Errorf("merge operation must have at least 2 
input files (%d)", len(op.InputFiles))
+               }
+
+               outputFiles[op.OutputFile] = struct{}{}
+               numOutputFiles++
+
+               for _, f := range op.InputFiles {
+                       inputFiles[f] = false
+                       numInputFiles++
+               }
+       }
+
+       if len(outputFiles) != numOutputFiles {
+               return errors.New("duplicate output files")
+       }
+
+       if len(inputFiles) != numInputFiles {
+               return errors.New("duplicate input files")
+       }
+
+       s := t.meta.currentSnapshot()
+
+       if s == nil {
+               return errors.New("merge operation requires an existing 
snapshot")
+       }
+
+       manifestFiles, err := s.Manifests(t.tbl.fs)
+       if err != nil {
+               return err
+       }
+
+       var (
+               existingManifestFiles  []iceberg.ManifestFile // manifest that 
don't contain any input files
+               existingDataFiles      []iceberg.DataFile     // existing data 
file entries in manifest that contain some input files
+               deletedManifestEntries []iceberg.ManifestEntry
+               deletedManifestFiles   []iceberg.ManifestFile
+       )
+
+       for _, manifestFile := range manifestFiles {
+               entries, err := manifestFile.FetchEntries(t.tbl.fs, false)
+               if err != nil {
+                       return err
+               }
+
+               var (
+                       isManifestFileTouched bool
+                       untouchedDataFiles    []iceberg.DataFile
+               )
+
+               for _, entry := range entries {
+                       entry.Status()

Review Comment:
   Weird indeed.



##########
table/snapshot_producers.go:
##########
@@ -92,6 +92,65 @@ func (fa *fastAppendFiles) deletedEntries() 
([]iceberg.ManifestEntry, error) {
        return nil, nil
 }
 
+func newMergeFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+       existingManifestFiles []iceberg.ManifestFile,
+       deletedManifestFiles []iceberg.ManifestFile,
+       deletedManifestEntries []iceberg.ManifestEntry,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &mergeFiles{
+               base:                   prod,
+               existingManifestFiles:  existingManifestFiles,
+               deletedManifestFiles:   deletedManifestFiles,
+               deletedManifestEntries: deletedManifestEntries,
+       }
+
+       return prod
+}
+
+type mergeFiles struct {
+       base *snapshotProducer
+
+       existingManifestFiles  []iceberg.ManifestFile
+       deletedManifestFiles   []iceberg.ManifestFile
+       deletedManifestEntries []iceberg.ManifestEntry
+}
+
+func (mf *mergeFiles) processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+       var manifestsToKeep []iceberg.ManifestFile
+
+       for _, manifest := range manifests {
+               var found bool
+
+               for _, deletedManifest := range mf.deletedManifestFiles {
+                       if manifest.FilePath() == deletedManifest.FilePath() {
+                               found = true
+
+                               break
+                       }
+               }
+
+               if !found {
+                       manifestsToKeep = append(manifestsToKeep, manifest)
+               }
+       }

Review Comment:
   Ok, I'm a bit confused now; I was pretty sure `ManifestContentDeletes` 
refers to parquet files containing deleted row masks, not a manifest containing 
a list of deletes files.
   
   I hacked this together without really understanding the way `producerImpl` 
interface is meant to be used, and I think it would be really beneficial to get 
some intel from you.
   
   Here is how I understood it:
   
   ```go
   type producerImpl interface {
           // called with the full list of manifest for the snapshot at the end 
of the process
           // it is here that I am supposed to get rid of the manifest files 
containing some merge input files ? 
        processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error)
           // why would it differs by implementation of `producerImpl` ?
           // isn't it supposed to be the list of manifest for the parent 
snapshot every time ?
        existingManifests() ([]iceberg.ManifestFile, error)
           // used to maintain table statistics but not to remove manifest 
containing those entries ?
        deletedEntries() ([]iceberg.ManifestEntry, error)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to