zeroshade commented on code in PR #330: URL: https://github.com/apache/iceberg-go/pull/330#discussion_r1991729503
########## table/snapshot_producers.go: ########## @@ -0,0 +1,582 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "bytes" + "fmt" + "io" + "maps" + "slices" + "sync/atomic" + "unicode/utf8" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/internal" + iceio "github.com/apache/iceberg-go/io" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" +) + +type manifestMergeManager struct { + targetSizeBytes int + minCountToMerge int + mergeEnabled bool + snap *snapshotProducer +} + +func (m *manifestMergeManager) groupBySpec(manifests []iceberg.ManifestFile) map[int][]iceberg.ManifestFile { + groups := make(map[int][]iceberg.ManifestFile) + for _, m := range manifests { + specid := int(m.PartitionSpecID()) + group := groups[specid] + groups[specid] = append(group, m) + } + + return groups +} + +func (m *manifestMergeManager) createManifest(specID int, bin []iceberg.ManifestFile) (iceberg.ManifestFile, error) { + wr, path, counter, err := m.snap.newManifestWriter(m.snap.spec(specID)) + if err != nil { + return nil, err + } + defer counter.W.(io.Closer).Close() + + for _, manifest := range bin { + entries, err := m.snap.fetchManifestEntry(manifest, false) + if err != nil { + return nil, err + } + + for _, entry := range entries { + switch { + case entry.Status() == iceberg.EntryStatusDELETED && entry.SnapshotID() == m.snap.snapshotID: + // only files deleted by this snapshot should be added to the new manifest + wr.Delete(entry) + case entry.Status() == iceberg.EntryStatusADDED && entry.SnapshotID() == m.snap.snapshotID: + // added entries from this snapshot are still added, otherwise they should be existing + wr.Add(entry) + case entry.Status() != iceberg.EntryStatusDELETED: + // add all non-deleted files from the old manifest as existing files + wr.Existing(entry) + } + } + } + + return wr.ToManifestFile(path, counter.Count) +} + +func (m *manifestMergeManager) mergeGroup(firstManifest iceberg.ManifestFile, specID int, manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + packer := internal.SlicePacker[iceberg.ManifestFile]{ + TargetWeight: int64(m.targetSizeBytes), + Lookback: 1, + LargestBinFirst: false, + } + bins := packer.PackEnd(manifests, func(m iceberg.ManifestFile) int64 { + return m.Length() + }) + + mergeBin := func(bin []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + output := make([]iceberg.ManifestFile, 0, 1) + if len(bin) == 1 { + output = append(output, bin[0]) + } else if len(bin) < m.minCountToMerge && slices.ContainsFunc(bin, func(m iceberg.ManifestFile) bool { return m == firstManifest }) { + // if the bin has the first manifest (the new data files or an appended + // manifest file) then only merge it if the number of manifests is above + // the minimum count. this is applied only to bins with an in-memory manifest + // so that large manifests don't prevent merging older groups + output = append(output, bin...) + } else { + created, err := m.createManifest(specID, bin) + if err != nil { + return nil, err + } + output = append(output, created) + } + + return output, nil + } + + binResults := make([][]iceberg.ManifestFile, len(bins)) + g := errgroup.Group{} + for i, bin := range bins { + i, bin := i, bin + g.Go(func() error { + var err error + binResults[i], err = mergeBin(bin) + + return err + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + return slices.Concat(binResults...), nil +} + +func (m *manifestMergeManager) mergeManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + if !m.mergeEnabled || len(manifests) == 0 { + return manifests, nil + } + + first := manifests[0] + groups := m.groupBySpec(manifests) + + merged := make([]iceberg.ManifestFile, 0, len(groups)) + for _, specID := range slices.Backward(slices.Sorted(maps.Keys(groups))) { + manifests, err := m.mergeGroup(first, specID, groups[specID]) + if err != nil { + return nil, err + } + + merged = append(merged, manifests...) + } + + return merged, nil +} + +type producerImpl interface { + processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) + existingManifests() ([]iceberg.ManifestFile, error) + deletedEntries() ([]iceberg.ManifestEntry, error) +} + +func newManifestFileName(num int, commit uuid.UUID) string { + return fmt.Sprintf("%s-m%d.avro", commit, num) +} + +func newManifestListFileName(snapshotID int64, attempt int, commit uuid.UUID) string { + // mimics behavior of java + // https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 + return fmt.Sprintf("snap-%d-%d-%s.avro", snapshotID, attempt, commit) +} + +func newFastAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &fastAppendFiles{base: prod} + + return prod +} + +type fastAppendFiles struct { + base *snapshotProducer +} + +func (fa *fastAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + return manifests, nil +} + +func (fa *fastAppendFiles) existingManifests() ([]iceberg.ManifestFile, error) { + existing := make([]iceberg.ManifestFile, 0) + if fa.base.parentSnapshotID > 0 { + previous, err := fa.base.txn.meta.SnapshotByID(fa.base.parentSnapshotID) + if err != nil { + return nil, fmt.Errorf("could not find parent snapshot %d", fa.base.parentSnapshotID) + } + + manifests, err := previous.Manifests(fa.base.io) + if err != nil { + return nil, err + } + + for _, m := range manifests { + if m.HasAddedFiles() || m.HasExistingFiles() || m.SnapshotID() == fa.base.snapshotID { + existing = append(existing, m) + } + } + } + + return existing, nil +} + +func (fa *fastAppendFiles) deletedEntries() ([]iceberg.ManifestEntry, error) { + return nil, nil +} + +type mergeAppendFiles struct { + fastAppendFiles + + targetSizeBytes int + minCountToMerge int + mergeEnabled bool +} + +func newMergeAppendFilesProducer(op Operation, txn *Transaction, fs iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties) *snapshotProducer { + prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps) + prod.producerImpl = &mergeAppendFiles{ + fastAppendFiles: fastAppendFiles{base: prod}, + targetSizeBytes: snapshotProps.GetInt(ManifestTargetSizeBytesKey, ManifestTargetSizeBytesDefault), + minCountToMerge: snapshotProps.GetInt(ManifestMinMergeCountKey, ManifestMinMergeCountDefault), + mergeEnabled: snapshotProps.GetBool(ManifestMergeEnabledKey, ManifestMergeEnabledDefault), + } + + return prod +} + +func (m *mergeAppendFiles) processManifests(manifests []iceberg.ManifestFile) ([]iceberg.ManifestFile, error) { + unmergedDataManifests, unmergedDeleteManifests := []iceberg.ManifestFile{}, []iceberg.ManifestFile{} + for _, m := range manifests { + if m.ManifestContent() == iceberg.ManifestContentData { + unmergedDataManifests = append(unmergedDataManifests, m) + } else if m.ManifestContent() == iceberg.ManifestContentDeletes { + unmergedDeleteManifests = append(unmergedDeleteManifests, m) + } Review Comment: I was following the pyiceberg example [here](https://github.com/apache/iceberg-python/blob/a275ce5941dc85b1721562ea4d2e316e2b36b28e/pyiceberg/table/update/snapshot.py#L144) looks like manifests which don't match are just ignored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org