zeroshade commented on code in PR #354:
URL: https://github.com/apache/iceberg-go/pull/354#discussion_r2007925104


##########
table/snapshot_producers.go:
##########
@@ -92,6 +92,65 @@ func (fa *fastAppendFiles) deletedEntries() 
([]iceberg.ManifestEntry, error) {
        return nil, nil
 }
 
+func newMergeFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+       existingManifestFiles []iceberg.ManifestFile,
+       deletedManifestFiles []iceberg.ManifestFile,
+       deletedManifestEntries []iceberg.ManifestEntry,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &mergeFiles{
+               base:                   prod,
+               existingManifestFiles:  existingManifestFiles,
+               deletedManifestFiles:   deletedManifestFiles,
+               deletedManifestEntries: deletedManifestEntries,
+       }
+
+       return prod
+}
+
+type mergeFiles struct {
+       base *snapshotProducer
+
+       existingManifestFiles  []iceberg.ManifestFile
+       deletedManifestFiles   []iceberg.ManifestFile
+       deletedManifestEntries []iceberg.ManifestEntry
+}
+
+func (mf *mergeFiles) processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+       var manifestsToKeep []iceberg.ManifestFile
+
+       for _, manifest := range manifests {
+               var found bool
+
+               for _, deletedManifest := range mf.deletedManifestFiles {
+                       if manifest.FilePath() == deletedManifest.FilePath() {
+                               found = true
+
+                               break
+                       }
+               }
+
+               if !found {
+                       manifestsToKeep = append(manifestsToKeep, manifest)
+               }
+       }

Review Comment:
   > Also, the order in which these methods are called is:
   
   Not quite:
   
   `existingManifests` and `deletedEntries` are called in parallel by separate 
goroutines. 
   
   You are correct that in *most* cases, `existingManifests` will just simply 
be all the manifests of a current snapshot (more specifically, all the 
manifests which either have "Added Files" or have "Existing Files" or were 
added by the parent snapshot. But, for example, in the case of an overwrite 
operation, we would re-write the "existing manifests" without the deleted data 
files.
   
   `deletedEntries` specifically is for processing the *new* deleted entries. 
It generates new entries with a Status of Deleted for the manifest.
   
   > So it seems like I will always have to process all manifest twice. Once 
before (or inside) deletedEntries, and once in existingManifests.
   
   For an overwrite operation, yea. We're essentially looping through the 
manifests twice (but in parallel, not in serial) so that we can process the 
existing and deleted entries simultaneously in separate goroutines. There might 
be a better way to handle this that we can look into improving in the future. 
But for now, given that it's handled with separate goroutines in parallel, I 
think we're fine.
   
   For `processManifests`: this is only if you actually need to perform any 
post-processing on the final list of manifests before they get written. This is 
only called on the final concatenated result of ALL the added, existing and 
delete manifests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to