zeroshade commented on code in PR #354:
URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2006488345


##########
table/transaction.go:
##########
@@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, 
snapshotProps iceberg.Prope
        return iceberg.ErrNotImplemented
 }
 
+type MergeOp struct {
+       OutputFile string
+       InputFiles []string
+}
+
+func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps 
iceberg.Properties) error {
+       var (
+               inputFiles     = make(map[string]bool)
+               outputFiles    = make(map[string]struct{})
+               numInputFiles  int
+               numOutputFiles int
+       )
+
+       for _, op := range ops {
+               if len(op.InputFiles) <= 1 {
+                       return fmt.Errorf("merge operation must have at least 2 
input files (%d)", len(op.InputFiles))
+               }
+
+               outputFiles[op.OutputFile] = struct{}{}
+               numOutputFiles++
+
+               for _, f := range op.InputFiles {
+                       inputFiles[f] = false
+                       numInputFiles++
+               }
+       }
+
+       if len(outputFiles) != numOutputFiles {
+               return errors.New("duplicate output files")
+       }
+
+       if len(inputFiles) != numInputFiles {
+               return errors.New("duplicate input files")
+       }
+
+       s := t.meta.currentSnapshot()
+
+       if s == nil {
+               return errors.New("merge operation requires an existing 
snapshot")
+       }
+
+       manifestFiles, err := s.Manifests(t.tbl.fs)
+       if err != nil {
+               return err
+       }
+
+       var (
+               existingManifestFiles  []iceberg.ManifestFile // manifest that 
don't contain any input files
+               existingDataFiles      []iceberg.DataFile     // existing data 
file entries in manifest that contain some input files
+               deletedManifestEntries []iceberg.ManifestEntry
+               deletedManifestFiles   []iceberg.ManifestFile
+       )
+
+       for _, manifestFile := range manifestFiles {
+               entries, err := manifestFile.FetchEntries(t.tbl.fs, false)
+               if err != nil {
+                       return err
+               }
+
+               var (
+                       isManifestFileTouched bool
+                       untouchedDataFiles    []iceberg.DataFile
+               )
+
+               for _, entry := range entries {
+                       entry.Status()

Review Comment:
   this line does nothing? Is something missing?



##########
table/transaction.go:
##########
@@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, 
snapshotProps iceberg.Prope
        return iceberg.ErrNotImplemented
 }
 
+type MergeOp struct {
+       OutputFile string
+       InputFiles []string
+}
+
+func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps 
iceberg.Properties) error {
+       var (
+               inputFiles     = make(map[string]bool)
+               outputFiles    = make(map[string]struct{})
+               numInputFiles  int
+               numOutputFiles int
+       )
+
+       for _, op := range ops {
+               if len(op.InputFiles) <= 1 {
+                       return fmt.Errorf("merge operation must have at least 2 
input files (%d)", len(op.InputFiles))
+               }
+
+               outputFiles[op.OutputFile] = struct{}{}
+               numOutputFiles++
+
+               for _, f := range op.InputFiles {
+                       inputFiles[f] = false
+                       numInputFiles++
+               }
+       }
+
+       if len(outputFiles) != numOutputFiles {
+               return errors.New("duplicate output files")
+       }
+
+       if len(inputFiles) != numInputFiles {
+               return errors.New("duplicate input files")
+       }
+
+       s := t.meta.currentSnapshot()
+
+       if s == nil {
+               return errors.New("merge operation requires an existing 
snapshot")
+       }
+
+       manifestFiles, err := s.Manifests(t.tbl.fs)
+       if err != nil {
+               return err
+       }
+
+       var (
+               existingManifestFiles  []iceberg.ManifestFile // manifest that 
don't contain any input files
+               existingDataFiles      []iceberg.DataFile     // existing data 
file entries in manifest that contain some input files
+               deletedManifestEntries []iceberg.ManifestEntry
+               deletedManifestFiles   []iceberg.ManifestFile
+       )
+
+       for _, manifestFile := range manifestFiles {
+               entries, err := manifestFile.FetchEntries(t.tbl.fs, false)
+               if err != nil {
+                       return err
+               }
+
+               var (
+                       isManifestFileTouched bool
+                       untouchedDataFiles    []iceberg.DataFile
+               )
+
+               for _, entry := range entries {
+                       entry.Status()
+
+                       _, found := inputFiles[entry.DataFile().FilePath()]
+
+                       if !found {
+                               untouchedDataFiles = append(untouchedDataFiles, 
entry.DataFile())
+                       } else {
+                               inputFiles[entry.DataFile().FilePath()] = true
+                               isManifestFileTouched = true
+
+                               deletedManifestEntries = 
append(deletedManifestEntries, iceberg.NewManifestEntry(
+                                       iceberg.EntryStatusDELETED,
+                                       internal.ToPtr(entry.SnapshotID()),
+                                       internal.ToPtr(entry.SequenceNum()),
+                                       entry.FileSequenceNum(),
+                                       entry.DataFile(),
+                               ))
+                       }
+               }
+
+               if !isManifestFileTouched {
+                       existingManifestFiles = append(existingManifestFiles, 
manifestFile)
+               } else {
+                       deletedManifestFiles = append(deletedManifestFiles, 
manifestFile)
+                       existingDataFiles = append(existingDataFiles, 
untouchedDataFiles...)
+
+               }
+       }
+
+       for f, found := range inputFiles {
+               if !found {
+                       return fmt.Errorf("input file %s not found in any 
manifest", f)
+               }
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return err
+               }
+       }
+
+       updater := t.updateSnapshot(snapshotProps).merge(existingManifestFiles, 
deletedManifestFiles, deletedManifestEntries)
+
+       outputDataFiles := parquetFilesToDataFiles(t.tbl.fs, t.meta, 
maps.Keys(outputFiles))

Review Comment:
   why are we re-processing all of the files? We have manifest entries for them 
already, we don't need to re-read and process them to create new entries. We 
can just use the existing entries, right?



##########
table/snapshot_producers.go:
##########
@@ -92,6 +92,65 @@ func (fa *fastAppendFiles) deletedEntries() 
([]iceberg.ManifestEntry, error) {
        return nil, nil
 }
 
+func newMergeFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+       existingManifestFiles []iceberg.ManifestFile,
+       deletedManifestFiles []iceberg.ManifestFile,
+       deletedManifestEntries []iceberg.ManifestEntry,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &mergeFiles{
+               base:                   prod,
+               existingManifestFiles:  existingManifestFiles,
+               deletedManifestFiles:   deletedManifestFiles,
+               deletedManifestEntries: deletedManifestEntries,
+       }
+
+       return prod
+}
+
+type mergeFiles struct {
+       base *snapshotProducer
+
+       existingManifestFiles  []iceberg.ManifestFile
+       deletedManifestFiles   []iceberg.ManifestFile
+       deletedManifestEntries []iceberg.ManifestEntry
+}
+
+func (mf *mergeFiles) processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+       var manifestsToKeep []iceberg.ManifestFile
+
+       for _, manifest := range manifests {
+               var found bool
+
+               for _, deletedManifest := range mf.deletedManifestFiles {
+                       if manifest.FilePath() == deletedManifest.FilePath() {
+                               found = true
+
+                               break
+                       }
+               }
+
+               if !found {
+                       manifestsToKeep = append(manifestsToKeep, manifest)
+               }
+       }

Review Comment:
   instead of spinning through multiple slices and having to pass in the 
original set of deleted manifests, why not do:
   
   ```go
   unmergedDataManifests, unmergedDeleteManifests := []iceberg.ManifestFile{}, 
[]iceberg.ManifestFile{}
   for _, m := range manifests {
           switch m.ManifestContent() {
           case iceberg.ManifestContentData:
                   unmergedDataManifests = append(unmergedDataManifests, m)
           case iceberg.ManifestContentDeletes:
                   unmergedDeleteManifests = append(unmergedDeleteManifests, m)
           }
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to