zeroshade commented on code in PR #530:
URL: https://github.com/apache/iceberg-go/pull/530#discussion_r2270964350


##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests 
[]iceberg.ManifestFile) ([
        return append(result, unmergedDeleteManifests...), nil
 }
 
+func newDeleteFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &deleteFiles{

Review Comment:
   shouldn't we default `predicate` to `AlwaysFalse{}`?



##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests 
[]iceberg.ManifestFile) ([
        return append(result, unmergedDeleteManifests...), nil
 }
 
+func newDeleteFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &deleteFiles{
+               base:            prod,
+               computed:        false,
+               keepManifests:   make([]iceberg.ManifestFile, 0),
+               removedEntries:  make([]iceberg.ManifestEntry, 0),
+               partialRewrites: false,
+       }
+
+       return prod
+}
+
+type deleteFiles struct {
+       base *snapshotProducer
+
+       predicate     iceberg.BooleanExpression
+       caseSensitive bool
+
+       computed        bool
+       keepManifests   []iceberg.ManifestFile
+       removedEntries  []iceberg.ManifestEntry
+       partialRewrites bool
+}
+
+func (df *deleteFiles) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
+       schema := df.base.txn.meta.CurrentSchema()
+       spec := df.base.spec(specID)
+       project := newInclusiveProjection(schema, spec, df.caseSensitive)
+       partitionFilter, err := project(df.predicate)
+       if err != nil {
+               return nil, err
+       }
+       return partitionFilter, nil

Review Comment:
   you can just do `return project(df.predicate)`



##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests 
[]iceberg.ManifestFile) ([
        return append(result, unmergedDeleteManifests...), nil
 }
 
+func newDeleteFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &deleteFiles{
+               base:            prod,
+               computed:        false,
+               keepManifests:   make([]iceberg.ManifestFile, 0),
+               removedEntries:  make([]iceberg.ManifestEntry, 0),
+               partialRewrites: false,
+       }
+
+       return prod
+}
+
+type deleteFiles struct {
+       base *snapshotProducer
+
+       predicate     iceberg.BooleanExpression
+       caseSensitive bool
+
+       computed        bool

Review Comment:
   what does `computed` represent?



##########
table/snapshot_producers_internal_test.go:
##########
@@ -0,0 +1,110 @@
+package table
+
+import (
+       "context"
+       "fmt"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+)
+
+type DeleteFilesTestSuite struct {
+       suite.Suite
+
+       ctx         context.Context
+       tableSchema *iceberg.Schema
+       location    string
+
+       formatVersion int
+}
+
+func (t *DeleteFilesTestSuite) SetupSuite() {
+       t.ctx = context.Background()
+
+       t.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.Bool},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Int32},
+               iceberg.NestedField{ID: 4, Name: "qux", Type: 
iceberg.PrimitiveTypes.Date})
+}
+
+func (t *DeleteFilesTestSuite) SetupTest() {
+       t.location = filepath.ToSlash(strings.Replace(t.T().TempDir(), "#", "", 
-1))
+}
+
+func (t *DeleteFilesTestSuite) createTable(identifier Identifier, 
formatVersion int, spec iceberg.PartitionSpec, sc *iceberg.Schema) *Table {
+       meta, err := NewMetadata(sc, &spec, UnsortedSortOrder,
+               t.location, iceberg.Properties{"format-version": 
strconv.Itoa(formatVersion)})
+       t.Require().NoError(err)
+
+       return New(
+               identifier,
+               meta,
+               fmt.Sprintf("%s/metadata/%05d-%s.metadata.json", t.location, 1, 
uuid.New().String()),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               nil,
+       )
+}
+
+func (t *DeleteFilesTestSuite) TestEmptyTable() {
+       ident := Identifier{"default", "delete_files_table_empty_v" + 
strconv.Itoa(t.formatVersion)}
+       table := t.createTable(ident, t.formatVersion,
+               *iceberg.UnpartitionedSpec, t.tableSchema)
+
+       tx := table.NewTransaction()
+       updater := tx.updateSnapshot(iceio.LocalFS{}, nil).delete()
+       df := updater.producerImpl.(*deleteFiles)
+       err := df.computeDeletes(iceberg.EqualTo(iceberg.Reference("foo"), 
true), true)
+       t.Require().NoError(err)
+
+       updates, reqs, err := updater.commit()

Review Comment:
   makes sense. Perhaps we need another base function in the impls to allow the 
producer to either override the `commit` method or otherwise indicate there was 
no change and thus no need to create a new snapshot



##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests 
[]iceberg.ManifestFile) ([
        return append(result, unmergedDeleteManifests...), nil
 }
 
+func newDeleteFilesProducer(
+       op Operation,
+       txn *Transaction,
+       fs iceio.WriteFileIO,
+       commitUUID *uuid.UUID,
+       snapshotProps iceberg.Properties,
+) *snapshotProducer {
+       prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+       prod.producerImpl = &deleteFiles{
+               base:            prod,
+               computed:        false,
+               keepManifests:   make([]iceberg.ManifestFile, 0),
+               removedEntries:  make([]iceberg.ManifestEntry, 0),
+               partialRewrites: false,
+       }
+
+       return prod
+}
+
+type deleteFiles struct {
+       base *snapshotProducer
+
+       predicate     iceberg.BooleanExpression
+       caseSensitive bool
+
+       computed        bool
+       keepManifests   []iceberg.ManifestFile
+       removedEntries  []iceberg.ManifestEntry
+       partialRewrites bool
+}
+
+func (df *deleteFiles) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
+       schema := df.base.txn.meta.CurrentSchema()
+       spec := df.base.spec(specID)
+       project := newInclusiveProjection(schema, spec, df.caseSensitive)
+       partitionFilter, err := project(df.predicate)
+       if err != nil {
+               return nil, err
+       }
+       return partitionFilter, nil
+}
+
+func (df *deleteFiles) partitionFilters() *keyDefaultMap[int, 
iceberg.BooleanExpression] {
+       return newKeyDefaultMapWrapErr(df.buildPartitionProjection)
+}
+
+func (df *deleteFiles) buildManifestEvaluator(specID int) 
(func(iceberg.ManifestFile) (bool, error), error) {
+       spec := df.base.spec(specID)
+       schema := df.base.txn.meta.CurrentSchema()
+       return newManifestEvaluator(
+               spec,
+               schema,
+               df.partitionFilters().Get(specID),
+               df.caseSensitive)
+}
+
+func (df *deleteFiles) copyWithNewStatus(entry iceberg.ManifestEntry, status 
iceberg.ManifestEntryStatus) iceberg.ManifestEntry {
+       snapId := df.base.snapshotID
+       if status == iceberg.EntryStatusEXISTING {
+               snapId = entry.SnapshotID()
+       }
+
+       seqNum := entry.SequenceNum()
+
+       return iceberg.NewManifestEntry(
+               status,
+               &snapId,
+               &seqNum,
+               entry.FileSequenceNum(),
+               entry.DataFile(),
+       )
+}
+
+func (df *deleteFiles) computeDeletes(predicate iceberg.BooleanExpression, 
caseSensitive bool) error {
+       schema := df.base.txn.meta.CurrentSchema()
+       manifestEvaluators := newKeyDefaultMapWrapErr(df.buildManifestEvaluator)
+       strictMetricsEvaluator, err := newStrictMetricsEvaluator(schema, 
predicate, caseSensitive, false)
+       if err != nil {
+               return err
+       }
+       inclusiveMetricsEvaluator, err := newInclusiveMetricsEvaluator(schema, 
predicate, caseSensitive, false)
+       if err != nil {
+               return err
+       }
+
+       // table has no snapshot, nothing to compute
+       if df.base.parentSnapshotID <= 0 {
+               return nil
+       }
+
+       snapshot, err := df.base.txn.meta.SnapshotByID(df.base.parentSnapshotID)
+       if err != nil {
+               return err
+       }
+       manifestFiles, err := snapshot.Manifests(df.base.io)
+       if err != nil {
+               return err
+       }
+
+       for _, manifestFile := range manifestFiles {
+               if manifestFile.ManifestContent() == 
iceberg.ManifestContentData {
+                       containMatch, err := 
manifestEvaluators.Get(int(manifestFile.PartitionSpecID()))(manifestFile)
+                       if err != nil {
+                               return err
+                       }
+
+                       if !containMatch {
+                               // if manifest file doesn't contain relevant 
rows matched the predicate,
+                               // keep it in the manifest list
+                               df.keepManifests = append(df.keepManifests, 
manifestFile)
+                       } else {
+                               // else evaluate each manifest entry and the 
corresponding data file in the manifest file
+                               deleteEntries := make([]iceberg.ManifestEntry, 
0)
+                               keepEntries := make([]iceberg.ManifestEntry, 0)
+                               entries, err := 
manifestFile.FetchEntries(df.base.io, true)
+                               if err != nil {
+                                       return err
+                               }
+
+                               for _, entry := range entries {
+                                       ok, err := 
strictMetricsEvaluator(entry.DataFile())
+                                       if err != nil {
+                                               return err
+                                       }
+                                       if ok == rowsMustMatch {
+                                               // based on entry metadata, all 
rows in the data file matched the predicate
+                                               deleteEntries = 
append(deleteEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusDELETED))
+                                       } else {
+                                               // can't determine based on 
entry metadata
+                                               keepEntries = 
append(keepEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusEXISTING))
+                                               ok, err = 
inclusiveMetricsEvaluator(entry.DataFile())
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               if ok != rowsMightNotMatch {
+                                                       df.partialRewrites = 
true
+                                               }
+                                       }
+                               }
+
+                               if len(deleteEntries) > 0 {
+                                       df.removedEntries = 
append(df.removedEntries, deleteEntries...)
+
+                                       // rewrite the manifests
+                                       if len(keepEntries) > 0 {
+                                               spec := 
df.base.spec(int(manifestFile.PartitionSpecID()))
+                                               wr, path, counter, err := 
df.base.newManifestWriter(spec)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               for _, entry := range 
keepEntries {
+                                                       err = wr.Add(entry)
+                                                       if err != nil {
+                                                               return err
+                                                       }
+                                               }
+                                               if err = wr.Close(); err != nil 
{
+                                                       return err
+                                               }
+                                               mf, err := 
wr.ToManifestFile(path, counter.Count)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               df.keepManifests = 
append(df.keepManifests, mf)
+                                       }
+                               } else {
+                                       df.keepManifests = 
append(df.keepManifests, manifestFile)
+                               }
+                       }
+               } else {
+                       df.keepManifests = append(df.keepManifests, 
manifestFile)
+               }
+       }
+       df.computed = true
+
+       return nil
+}
+
+func (df *deleteFiles) ensureComputed() error {
+       if !df.computed {
+               err := df.computeDeletes(iceberg.AlwaysFalse{}, true)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}

Review Comment:
   You've got a race condition here. You need to lock around this or use a 
sync.Once, or something equivalent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to