zeroshade commented on code in PR #330:
URL: https://github.com/apache/iceberg-go/pull/330#discussion_r1991729503


##########
table/snapshot_producers.go:
##########
@@ -0,0 +1,582 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "maps"
+       "slices"
+       "sync/atomic"
+       "unicode/utf8"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/internal"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/google/uuid"
+       "golang.org/x/sync/errgroup"
+)
+
+type manifestMergeManager struct {
+       targetSizeBytes int
+       minCountToMerge int
+       mergeEnabled    bool
+       snap            *snapshotProducer
+}
+
+func (m *manifestMergeManager) groupBySpec(manifests []iceberg.ManifestFile) 
map[int][]iceberg.ManifestFile {
+       groups := make(map[int][]iceberg.ManifestFile)
+       for _, m := range manifests {
+               specid := int(m.PartitionSpecID())
+               group := groups[specid]
+               groups[specid] = append(group, m)
+       }
+
+       return groups
+}
+
+func (m *manifestMergeManager) createManifest(specID int, bin 
[]iceberg.ManifestFile) (iceberg.ManifestFile, error) {
+       wr, path, counter, err := m.snap.newManifestWriter(m.snap.spec(specID))
+       if err != nil {
+               return nil, err
+       }
+       defer counter.W.(io.Closer).Close()
+
+       for _, manifest := range bin {
+               entries, err := m.snap.fetchManifestEntry(manifest, false)
+               if err != nil {
+                       return nil, err
+               }
+
+               for _, entry := range entries {
+                       switch {
+                       case entry.Status() == iceberg.EntryStatusDELETED && 
entry.SnapshotID() == m.snap.snapshotID:
+                               // only files deleted by this snapshot should 
be added to the new manifest
+                               wr.Delete(entry)
+                       case entry.Status() == iceberg.EntryStatusADDED && 
entry.SnapshotID() == m.snap.snapshotID:
+                               // added entries from this snapshot are still 
added, otherwise they should be existing
+                               wr.Add(entry)
+                       case entry.Status() != iceberg.EntryStatusDELETED:
+                               // add all non-deleted files from the old 
manifest as existing files
+                               wr.Existing(entry)
+                       }
+               }
+       }
+
+       return wr.ToManifestFile(path, counter.Count)
+}
+
+func (m *manifestMergeManager) mergeGroup(firstManifest iceberg.ManifestFile, 
specID int, manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) {
+       packer := internal.SlicePacker[iceberg.ManifestFile]{
+               TargetWeight:    int64(m.targetSizeBytes),
+               Lookback:        1,
+               LargestBinFirst: false,
+       }
+       bins := packer.PackEnd(manifests, func(m iceberg.ManifestFile) int64 {
+               return m.Length()
+       })
+
+       mergeBin := func(bin []iceberg.ManifestFile) ([]iceberg.ManifestFile, 
error) {
+               output := make([]iceberg.ManifestFile, 0, 1)
+               if len(bin) == 1 {
+                       output = append(output, bin[0])
+               } else if len(bin) < m.minCountToMerge && 
slices.ContainsFunc(bin, func(m iceberg.ManifestFile) bool { return m == 
firstManifest }) {
+                       // if the bin has the first manifest (the new data 
files or an appended
+                       // manifest file) then only merge it if the number of 
manifests is above
+                       // the minimum count. this is applied only to bins with 
an in-memory manifest
+                       // so that large manifests don't prevent merging older 
groups
+                       output = append(output, bin...)
+               } else {
+                       created, err := m.createManifest(specID, bin)
+                       if err != nil {
+                               return nil, err
+                       }
+                       output = append(output, created)
+               }
+
+               return output, nil
+       }
+
+       binResults := make([][]iceberg.ManifestFile, len(bins))
+       g := errgroup.Group{}
+       for i, bin := range bins {
+               i, bin := i, bin
+               g.Go(func() error {
+                       var err error
+                       binResults[i], err = mergeBin(bin)
+
+                       return err
+               })
+       }
+
+       if err := g.Wait(); err != nil {
+               return nil, err
+       }
+
+       return slices.Concat(binResults...), nil
+}
+
+func (m *manifestMergeManager) mergeManifests(manifests 
[]iceberg.ManifestFile) ([]iceberg.ManifestFile, error) {
+       if !m.mergeEnabled || len(manifests) == 0 {
+               return manifests, nil
+       }
+
+       first := manifests[0]
+       groups := m.groupBySpec(manifests)
+
+       merged := make([]iceberg.ManifestFile, 0, len(groups))
+       for _, specID := range 
slices.Backward(slices.Sorted(maps.Keys(groups))) {
+               manifests, err := m.mergeGroup(first, specID, groups[specID])
+               if err != nil {
+                       return nil, err
+               }
+
+               merged = append(merged, manifests...)
+       }
+
+       return merged, nil
+}
+
+type producerImpl interface {
+       processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error)
+       existingManifests() ([]iceberg.ManifestFile, error)
+       deletedEntries() ([]iceberg.ManifestEntry, error)
+}
+
+func newManifestFileName(num int, commit uuid.UUID) string {
+       return fmt.Sprintf("%s-m%d.avro", commit, num)
+}
+
+func newManifestListFileName(snapshotID int64, attempt int, commit uuid.UUID) 
string {
+       // mimics behavior of java
+       // 
https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
+       return fmt.Sprintf("snap-%d-%d-%s.avro", snapshotID, attempt, commit)
+}
+
+func newFastAppendFilesProducer(op Operation, txn *Transaction, fs 
iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) 
*snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &fastAppendFiles{base: prod}
+
+       return prod
+}
+
+type fastAppendFiles struct {
+       base *snapshotProducer
+}
+
+func (fa *fastAppendFiles) processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+       return manifests, nil
+}
+
+func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) 
{
+       existing := make([]iceberg.ManifestFile, 0)
+       if fa.base.parentSnapshotID > 0 {
+               previous, err := 
fa.base.txn.meta.SnapshotByID(fa.base.parentSnapshotID)
+               if err != nil {
+                       return nil, fmt.Errorf("could not find parent snapshot 
%d", fa.base.parentSnapshotID)
+               }
+
+               manifests, err := previous.Manifests(fa.base.io)
+               if err != nil {
+                       return nil, err
+               }
+
+               for _, m := range manifests {
+                       if m.HasAddedFiles() || m.HasExistingFiles() || 
m.SnapshotID() == fa.base.snapshotID {
+                               existing = append(existing, m)
+                       }
+               }
+       }
+
+       return existing, nil
+}
+
+func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) {
+       return nil, nil
+}
+
+type mergeAppendFiles struct {
+       fastAppendFiles
+
+       targetSizeBytes int
+       minCountToMerge int
+       mergeEnabled    bool
+}
+
+func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs 
iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) 
*snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &mergeAppendFiles{
+               fastAppendFiles: fastAppendFiles{base: prod},
+               targetSizeBytes: 
snapshotProps.GetInt(ManifestTargetSizeBytesKey, 
ManifestTargetSizeBytesDefault),
+               minCountToMerge: snapshotProps.GetInt(ManifestMinMergeCountKey, 
ManifestMinMergeCountDefault),
+               mergeEnabled:    snapshotProps.GetBool(ManifestMergeEnabledKey, 
ManifestMergeEnabledDefault),
+       }
+
+       return prod
+}
+
+func (m *mergeAppendFiles) processManifests(manifests []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+       unmergedDataManifests, unmergedDeleteManifests := 
[]iceberg.ManifestFile{}, []iceberg.ManifestFile{}
+       for _, m := range manifests {
+               if m.ManifestContent() == iceberg.ManifestContentData {
+                       unmergedDataManifests = append(unmergedDataManifests, m)
+               } else if m.ManifestContent() == iceberg.ManifestContentDeletes 
{
+                       unmergedDeleteManifests = 
append(unmergedDeleteManifests, m)
+               }

Review Comment:
   I was following the pyiceberg example 
[here](https://github.com/apache/iceberg-python/blob/a275ce5941dc85b1721562ea4d2e316e2b36b28e/pyiceberg/table/update/snapshot.py#L144)
 looks like manifests which don't match are just ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to