zeroshade commented on code in PR #252:
URL: https://github.com/apache/iceberg-go/pull/252#discussion_r1913882185


##########
table/scanner.go:
##########
@@ -259,129 +286,119 @@ func matchDeletesToData(entry iceberg.ManifestEntry, 
positionalDeletes []iceberg
        return out, nil
 }
 
-func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+// fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
+// fetches its manifest files, and applies partition-spec filters to remove 
irrelevant manifests.
+func (scan *Scan) fetchPartitionSpecFilteredManifests() 
([]iceberg.ManifestFile, error) {
        snap := scan.Snapshot()
        if snap == nil {
                return nil, nil
        }
 
-       // step 1: filter manifests using partition summaries
-       // the filter depends on the partition spec used to write the manifest 
file
-       // so create a cache of filters for each spec id
-       manifestEvaluators := 
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
+       // Fetch all manifests for the current snapshot.
        manifestList, err := snap.Manifests(scan.io)
        if err != nil {
                return nil, err
        }
 
-       // remove any manifests that we don't need to use
+       // Build per-spec manifest evaluators and filter out irrelevant 
manifests.
+       manifestEvaluators := 
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
        manifestList = slices.DeleteFunc(manifestList, func(mf 
iceberg.ManifestFile) bool {
                eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
                use, err := eval(mf)
                return !use || err != nil
        })
 
-       // step 2: filter the data files in each manifest
-       // this filter depends on the partition spec used to write the manifest 
file
-       partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
+       return manifestList, nil
+}
+
+// collectManifestEntries concurrently opens manifests, applies partition and 
metrics
+// filters, and accumulates both data entries and positional-delete entries.
+func (scan *Scan) collectManifestEntries(
+       ctx context.Context,
+       manifestList []iceberg.ManifestFile,
+) (*manifestEntries, error) {
        metricsEval, err := newInclusiveMetricsEvaluator(
-               scan.metadata.CurrentSchema(), scan.rowFilter, 
scan.caseSensitive, scan.options["include_empty_files"] == "true")
+               scan.metadata.CurrentSchema(),
+               scan.rowFilter,
+               scan.caseSensitive,
+               scan.options["include_empty_files"] == "true",
+       )
        if err != nil {
                return nil, err
        }
 
        minSeqNum := minSequenceNum(manifestList)
-       dataEntries := make([]iceberg.ManifestEntry, 0)
-       positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
-
-       nworkers := min(scan.concurrency, len(manifestList))
-       var wg sync.WaitGroup
-
-       manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
-       entryChan := make(chan []iceberg.ManifestEntry, 20)
-
-       ctx, cancel := context.WithCancelCause(ctx)
-       for i := 0; i < nworkers; i++ {
-               wg.Add(1)
-
-               go func() {
-                       defer wg.Done()
-
-                       for {
-                               select {
-                               case m, ok := <-manifestChan:
-                                       if !ok {
-                                               return
-                                       }
-
-                                       if !scan.checkSequenceNumber(minSeqNum, 
m) {
-                                               continue
-                                       }
-
-                                       entries, err := openManifest(scan.io, m,
-                                               
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
-                                       if err != nil {
-                                               cancel(err)
-                                               break
-                                       }
-
-                                       entryChan <- entries
-                               case <-ctx.Done():
-                                       return
-                               }
-                       }
-               }()
-       }
+       concurrencyLimit := min(scan.concurrency, len(manifestList))
 
-       go func() {
-               wg.Wait()
-               close(entryChan)
-       }()
+       entries := newManifestEntries()
+       g, _ := errgroup.WithContext(ctx)
+       g.SetLimit(concurrencyLimit)
 
-       for _, m := range manifestList {
-               manifestChan <- m
-       }
-       close(manifestChan)
-
-Loop:
-       for {
-               select {
-               case <-ctx.Done():
-                       return nil, context.Cause(ctx)
-               case entries, ok := <-entryChan:
-                       if !ok {
-                               // closed!
-                               break Loop
+       partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
+
+       for _, mf := range manifestList {
+               if !scan.checkSequenceNumber(minSeqNum, mf) {
+                       continue
+               }
+
+               g.Go(func() error {
+                       partEval := 
partitionEvaluators.Get(int(mf.PartitionSpecID()))
+                       manifestEntries, err := openManifest(scan.io, mf, 
partEval, metricsEval)
+                       if err != nil {
+                               return err
                        }
 
-                       for _, e := range entries {
+                       for _, e := range manifestEntries {
                                df := e.DataFile()
                                switch df.ContentType() {
                                case iceberg.EntryContentData:
-                                       dataEntries = append(dataEntries, e)
+                                       entries.addDataEntry(e)
                                case iceberg.EntryContentPosDeletes:
-                                       positionalDeleteEntries = 
append(positionalDeleteEntries, e)
+                                       entries.addPositionalDeleteEntry(e)
                                case iceberg.EntryContentEqDeletes:
-                                       return nil, fmt.Errorf("iceberg-go does 
not yet support equality deletes")
+                                       return fmt.Errorf("iceberg-go does not 
yet support equality deletes")
                                default:
-                                       return nil, fmt.Errorf("%w: unknown 
DataFileContent type (%s): %s",
+                                       return fmt.Errorf("%w: unknown 
DataFileContent type (%s): %s",
                                                ErrInvalidMetadata, 
df.ContentType(), e)
                                }
                        }
-               }
+                       return nil
+               })
+       }

Review Comment:
   Fair enough. This seems reasonable to me and is unlikely to cause any issues 
I think. So I think we can move forward with this refactor. It might be 
worthwhile looking into adding some benchmarking to track the performance of 
the planning on various numbers of manifests and manifest entries so that we 
can keep track of it in the future.
   
   Not something that we need for this particular change, but definitely 
something to look into.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to