arnaudbriche commented on code in PR #354:
URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2007259086


##########
table/transaction.go:
##########
@@ -121,6 +140,140 @@ func (t *Transaction) Append(rdr array.RecordReader, 
snapshotProps iceberg.Prope
        return iceberg.ErrNotImplemented
 }
 
+type MergeOp struct {
+       OutputFile string
+       InputFiles []string
+}
+
+func (t *Transaction) MergeFiles(ops []MergeOp, snapshotProps 
iceberg.Properties) error {
+       var (
+               inputFiles     = make(map[string]bool)
+               outputFiles    = make(map[string]struct{})
+               numInputFiles  int
+               numOutputFiles int
+       )
+
+       for _, op := range ops {
+               if len(op.InputFiles) <= 1 {
+                       return fmt.Errorf("merge operation must have at least 2 
input files (%d)", len(op.InputFiles))
+               }
+
+               outputFiles[op.OutputFile] = struct{}{}
+               numOutputFiles++
+
+               for _, f := range op.InputFiles {
+                       inputFiles[f] = false
+                       numInputFiles++
+               }
+       }
+
+       if len(outputFiles) != numOutputFiles {
+               return errors.New("duplicate output files")
+       }
+
+       if len(inputFiles) != numInputFiles {
+               return errors.New("duplicate input files")
+       }
+
+       s := t.meta.currentSnapshot()
+
+       if s == nil {
+               return errors.New("merge operation requires an existing 
snapshot")
+       }
+
+       manifestFiles, err := s.Manifests(t.tbl.fs)
+       if err != nil {
+               return err
+       }
+
+       var (
+               existingManifestFiles  []iceberg.ManifestFile // manifest that 
don't contain any input files
+               existingDataFiles      []iceberg.DataFile     // existing data 
file entries in manifest that contain some input files
+               deletedManifestEntries []iceberg.ManifestEntry
+               deletedManifestFiles   []iceberg.ManifestFile
+       )
+
+       for _, manifestFile := range manifestFiles {
+               entries, err := manifestFile.FetchEntries(t.tbl.fs, false)
+               if err != nil {
+                       return err
+               }
+
+               var (
+                       isManifestFileTouched bool
+                       untouchedDataFiles    []iceberg.DataFile
+               )
+
+               for _, entry := range entries {
+                       entry.Status()
+
+                       _, found := inputFiles[entry.DataFile().FilePath()]
+
+                       if !found {
+                               untouchedDataFiles = append(untouchedDataFiles, 
entry.DataFile())
+                       } else {
+                               inputFiles[entry.DataFile().FilePath()] = true
+                               isManifestFileTouched = true
+
+                               deletedManifestEntries = 
append(deletedManifestEntries, iceberg.NewManifestEntry(
+                                       iceberg.EntryStatusDELETED,
+                                       internal.ToPtr(entry.SnapshotID()),
+                                       internal.ToPtr(entry.SequenceNum()),
+                                       entry.FileSequenceNum(),
+                                       entry.DataFile(),
+                               ))
+                       }
+               }
+
+               if !isManifestFileTouched {
+                       existingManifestFiles = append(existingManifestFiles, 
manifestFile)
+               } else {
+                       deletedManifestFiles = append(deletedManifestFiles, 
manifestFile)
+                       existingDataFiles = append(existingDataFiles, 
untouchedDataFiles...)
+
+               }
+       }
+
+       for f, found := range inputFiles {
+               if !found {
+                       return fmt.Errorf("input file %s not found in any 
manifest", f)
+               }
+       }
+
+       if t.meta.NameMapping() == nil {
+               nameMapping := t.meta.CurrentSchema().NameMapping()
+               mappingJson, err := json.Marshal(nameMapping)
+               if err != nil {
+                       return err
+               }
+               err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey: 
string(mappingJson)})
+               if err != nil {
+                       return err
+               }
+       }
+
+       updater := t.updateSnapshot(snapshotProps).merge(existingManifestFiles, 
deletedManifestFiles, deletedManifestEntries)
+
+       outputDataFiles := parquetFilesToDataFiles(t.tbl.fs, t.meta, 
maps.Keys(outputFiles))

Review Comment:
   I don't understand whar you mean here.
   `outputFiles` is a list of newly created Parquet files resulting from the 
merge operation, so we don't have existing entries.
   Maybe you are talking about some else ? I can hook it up to have the working 
feature then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to