zeroshade commented on code in PR #530:
URL: https://github.com/apache/iceberg-go/pull/530#discussion_r2270964350
##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests
[]iceberg.ManifestFile) ([
return append(result, unmergedDeleteManifests...), nil
}
+func newDeleteFilesProducer(
+ op Operation,
+ txn *Transaction,
+ fs iceio.WriteFileIO,
+ commitUUID *uuid.UUID,
+ snapshotProps iceberg.Properties,
+) *snapshotProducer {
+ prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+ prod.producerImpl = &deleteFiles{
Review Comment:
shouldn't we default `predicate` to `AlwaysFalse{}`?
##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests
[]iceberg.ManifestFile) ([
return append(result, unmergedDeleteManifests...), nil
}
+func newDeleteFilesProducer(
+ op Operation,
+ txn *Transaction,
+ fs iceio.WriteFileIO,
+ commitUUID *uuid.UUID,
+ snapshotProps iceberg.Properties,
+) *snapshotProducer {
+ prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+ prod.producerImpl = &deleteFiles{
+ base: prod,
+ computed: false,
+ keepManifests: make([]iceberg.ManifestFile, 0),
+ removedEntries: make([]iceberg.ManifestEntry, 0),
+ partialRewrites: false,
+ }
+
+ return prod
+}
+
+type deleteFiles struct {
+ base *snapshotProducer
+
+ predicate iceberg.BooleanExpression
+ caseSensitive bool
+
+ computed bool
+ keepManifests []iceberg.ManifestFile
+ removedEntries []iceberg.ManifestEntry
+ partialRewrites bool
+}
+
+func (df *deleteFiles) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
+ schema := df.base.txn.meta.CurrentSchema()
+ spec := df.base.spec(specID)
+ project := newInclusiveProjection(schema, spec, df.caseSensitive)
+ partitionFilter, err := project(df.predicate)
+ if err != nil {
+ return nil, err
+ }
+ return partitionFilter, nil
Review Comment:
you can just do `return project(df.predicate)`
##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests
[]iceberg.ManifestFile) ([
return append(result, unmergedDeleteManifests...), nil
}
+func newDeleteFilesProducer(
+ op Operation,
+ txn *Transaction,
+ fs iceio.WriteFileIO,
+ commitUUID *uuid.UUID,
+ snapshotProps iceberg.Properties,
+) *snapshotProducer {
+ prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+ prod.producerImpl = &deleteFiles{
+ base: prod,
+ computed: false,
+ keepManifests: make([]iceberg.ManifestFile, 0),
+ removedEntries: make([]iceberg.ManifestEntry, 0),
+ partialRewrites: false,
+ }
+
+ return prod
+}
+
+type deleteFiles struct {
+ base *snapshotProducer
+
+ predicate iceberg.BooleanExpression
+ caseSensitive bool
+
+ computed bool
Review Comment:
what does `computed` represent?
##########
table/snapshot_producers_internal_test.go:
##########
@@ -0,0 +1,110 @@
+package table
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/suite"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "testing"
+)
+
+type DeleteFilesTestSuite struct {
+ suite.Suite
+
+ ctx context.Context
+ tableSchema *iceberg.Schema
+ location string
+
+ formatVersion int
+}
+
+func (t *DeleteFilesTestSuite) SetupSuite() {
+ t.ctx = context.Background()
+
+ t.tableSchema = iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.Bool},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Int32},
+ iceberg.NestedField{ID: 4, Name: "qux", Type:
iceberg.PrimitiveTypes.Date})
+}
+
+func (t *DeleteFilesTestSuite) SetupTest() {
+ t.location = filepath.ToSlash(strings.Replace(t.T().TempDir(), "#", "",
-1))
+}
+
+func (t *DeleteFilesTestSuite) createTable(identifier Identifier,
formatVersion int, spec iceberg.PartitionSpec, sc *iceberg.Schema) *Table {
+ meta, err := NewMetadata(sc, &spec, UnsortedSortOrder,
+ t.location, iceberg.Properties{"format-version":
strconv.Itoa(formatVersion)})
+ t.Require().NoError(err)
+
+ return New(
+ identifier,
+ meta,
+ fmt.Sprintf("%s/metadata/%05d-%s.metadata.json", t.location, 1,
uuid.New().String()),
+ func(ctx context.Context) (iceio.IO, error) {
+ return iceio.LocalFS{}, nil
+ },
+ nil,
+ )
+}
+
+func (t *DeleteFilesTestSuite) TestEmptyTable() {
+ ident := Identifier{"default", "delete_files_table_empty_v" +
strconv.Itoa(t.formatVersion)}
+ table := t.createTable(ident, t.formatVersion,
+ *iceberg.UnpartitionedSpec, t.tableSchema)
+
+ tx := table.NewTransaction()
+ updater := tx.updateSnapshot(iceio.LocalFS{}, nil).delete()
+ df := updater.producerImpl.(*deleteFiles)
+ err := df.computeDeletes(iceberg.EqualTo(iceberg.Reference("foo"),
true), true)
+ t.Require().NoError(err)
+
+ updates, reqs, err := updater.commit()
Review Comment:
makes sense. Perhaps we need another base function in the impls to allow the
producer to either override the `commit` method or otherwise indicate there was
no change and thus no need to create a new snapshot
##########
table/snapshot_producers.go:
##########
@@ -410,6 +410,218 @@ func (m *mergeAppendFiles) processManifests(manifests
[]iceberg.ManifestFile) ([
return append(result, unmergedDeleteManifests...), nil
}
+func newDeleteFilesProducer(
+ op Operation,
+ txn *Transaction,
+ fs iceio.WriteFileIO,
+ commitUUID *uuid.UUID,
+ snapshotProps iceberg.Properties,
+) *snapshotProducer {
+ prod := createSnapshotProducer(op, txn, fs, commitUUID, snapshotProps)
+ prod.producerImpl = &deleteFiles{
+ base: prod,
+ computed: false,
+ keepManifests: make([]iceberg.ManifestFile, 0),
+ removedEntries: make([]iceberg.ManifestEntry, 0),
+ partialRewrites: false,
+ }
+
+ return prod
+}
+
+type deleteFiles struct {
+ base *snapshotProducer
+
+ predicate iceberg.BooleanExpression
+ caseSensitive bool
+
+ computed bool
+ keepManifests []iceberg.ManifestFile
+ removedEntries []iceberg.ManifestEntry
+ partialRewrites bool
+}
+
+func (df *deleteFiles) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
+ schema := df.base.txn.meta.CurrentSchema()
+ spec := df.base.spec(specID)
+ project := newInclusiveProjection(schema, spec, df.caseSensitive)
+ partitionFilter, err := project(df.predicate)
+ if err != nil {
+ return nil, err
+ }
+ return partitionFilter, nil
+}
+
+func (df *deleteFiles) partitionFilters() *keyDefaultMap[int,
iceberg.BooleanExpression] {
+ return newKeyDefaultMapWrapErr(df.buildPartitionProjection)
+}
+
+func (df *deleteFiles) buildManifestEvaluator(specID int)
(func(iceberg.ManifestFile) (bool, error), error) {
+ spec := df.base.spec(specID)
+ schema := df.base.txn.meta.CurrentSchema()
+ return newManifestEvaluator(
+ spec,
+ schema,
+ df.partitionFilters().Get(specID),
+ df.caseSensitive)
+}
+
+func (df *deleteFiles) copyWithNewStatus(entry iceberg.ManifestEntry, status
iceberg.ManifestEntryStatus) iceberg.ManifestEntry {
+ snapId := df.base.snapshotID
+ if status == iceberg.EntryStatusEXISTING {
+ snapId = entry.SnapshotID()
+ }
+
+ seqNum := entry.SequenceNum()
+
+ return iceberg.NewManifestEntry(
+ status,
+ &snapId,
+ &seqNum,
+ entry.FileSequenceNum(),
+ entry.DataFile(),
+ )
+}
+
+func (df *deleteFiles) computeDeletes(predicate iceberg.BooleanExpression,
caseSensitive bool) error {
+ schema := df.base.txn.meta.CurrentSchema()
+ manifestEvaluators := newKeyDefaultMapWrapErr(df.buildManifestEvaluator)
+ strictMetricsEvaluator, err := newStrictMetricsEvaluator(schema,
predicate, caseSensitive, false)
+ if err != nil {
+ return err
+ }
+ inclusiveMetricsEvaluator, err := newInclusiveMetricsEvaluator(schema,
predicate, caseSensitive, false)
+ if err != nil {
+ return err
+ }
+
+ // table has no snapshot, nothing to compute
+ if df.base.parentSnapshotID <= 0 {
+ return nil
+ }
+
+ snapshot, err := df.base.txn.meta.SnapshotByID(df.base.parentSnapshotID)
+ if err != nil {
+ return err
+ }
+ manifestFiles, err := snapshot.Manifests(df.base.io)
+ if err != nil {
+ return err
+ }
+
+ for _, manifestFile := range manifestFiles {
+ if manifestFile.ManifestContent() ==
iceberg.ManifestContentData {
+ containMatch, err :=
manifestEvaluators.Get(int(manifestFile.PartitionSpecID()))(manifestFile)
+ if err != nil {
+ return err
+ }
+
+ if !containMatch {
+ // if manifest file doesn't contain relevant
rows matched the predicate,
+ // keep it in the manifest list
+ df.keepManifests = append(df.keepManifests,
manifestFile)
+ } else {
+ // else evaluate each manifest entry and the
corresponding data file in the manifest file
+ deleteEntries := make([]iceberg.ManifestEntry,
0)
+ keepEntries := make([]iceberg.ManifestEntry, 0)
+ entries, err :=
manifestFile.FetchEntries(df.base.io, true)
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range entries {
+ ok, err :=
strictMetricsEvaluator(entry.DataFile())
+ if err != nil {
+ return err
+ }
+ if ok == rowsMustMatch {
+ // based on entry metadata, all
rows in the data file matched the predicate
+ deleteEntries =
append(deleteEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusDELETED))
+ } else {
+ // can't determine based on
entry metadata
+ keepEntries =
append(keepEntries, df.copyWithNewStatus(entry, iceberg.EntryStatusEXISTING))
+ ok, err =
inclusiveMetricsEvaluator(entry.DataFile())
+ if err != nil {
+ return err
+ }
+ if ok != rowsMightNotMatch {
+ df.partialRewrites =
true
+ }
+ }
+ }
+
+ if len(deleteEntries) > 0 {
+ df.removedEntries =
append(df.removedEntries, deleteEntries...)
+
+ // rewrite the manifests
+ if len(keepEntries) > 0 {
+ spec :=
df.base.spec(int(manifestFile.PartitionSpecID()))
+ wr, path, counter, err :=
df.base.newManifestWriter(spec)
+ if err != nil {
+ return err
+ }
+ for _, entry := range
keepEntries {
+ err = wr.Add(entry)
+ if err != nil {
+ return err
+ }
+ }
+ if err = wr.Close(); err != nil
{
+ return err
+ }
+ mf, err :=
wr.ToManifestFile(path, counter.Count)
+ if err != nil {
+ return err
+ }
+ df.keepManifests =
append(df.keepManifests, mf)
+ }
+ } else {
+ df.keepManifests =
append(df.keepManifests, manifestFile)
+ }
+ }
+ } else {
+ df.keepManifests = append(df.keepManifests,
manifestFile)
+ }
+ }
+ df.computed = true
+
+ return nil
+}
+
+func (df *deleteFiles) ensureComputed() error {
+ if !df.computed {
+ err := df.computeDeletes(iceberg.AlwaysFalse{}, true)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
Review Comment:
You've got a race condition here. You need to lock around this or use a
sync.Once, or something equivalent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]