zeroshade commented on code in PR #674:
URL: https://github.com/apache/iceberg-go/pull/674#discussion_r2738875761
##########
table/transaction.go:
##########
@@ -499,6 +499,282 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps
iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ for _, manifest := range manifests {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
manifest: %w", err)
Review Comment:
we should add the manifest path to this error. i.e. `fmt.Errorf("failed to
evaluate manifest %s: %w", manifest.ManifestPath(), err)`
##########
table/table.go:
##########
@@ -129,6 +129,26 @@ func (t Table) Append(ctx context.Context, rdr
array.RecordReader, snapshotProps
return txn.Commit(ctx)
}
+// OverwriteTable is a shortcut for NewTransaction().OverwriteTable() and then
committing the transaction
+func (t Table) OverwriteTable(ctx context.Context, tbl arrow.Table, batchSize
int64, filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps
iceberg.Properties) (*Table, error) {
Review Comment:
let's add to the comment that the `batchSize` refers to the reading of the
input rather than the batchSize of writes.
##########
table/transaction.go:
##########
@@ -499,6 +499,282 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps
iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ for _, manifest := range manifests {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
manifest: %w", err)
+ }
+ if !match {
+ continue
+ }
+ }
+
+ entries, err := manifest.FetchEntries(fs, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to fetch manifest
entries: %w", err)
+ }
+
+ for _, entry := range entries {
+ if entry.Status() == iceberg.EntryStatusDELETED {
+ continue
+ }
+
+ df := entry.DataFile()
+ if df.ContentType() != iceberg.EntryContentData {
+ continue
+ }
+
+ inclusive, err := inclusiveEvaluator(df)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+ }
+
+ if !inclusive {
+ continue
+ }
+
+ strict, err := strictEvaluator(df)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
data file %s with strict evaluator: %w", df.FilePath(), err)
+ }
+
+ if strict {
+ filesToDelete = append(filesToDelete, df)
+ } else {
+ filesToRewrite = append(filesToRewrite, df)
+ }
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression) error {
+ complementFilter := iceberg.NewNot(filter)
+
+ // Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
+ rewriteUUID := uuid.New()
+ for _, originalFile := range files {
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID)
+ if err != nil {
+ return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
+ }
+
+ updater.deleteDataFile(originalFile)
+ for _, rewrittenFile := range rewrittenFiles {
+ updater.appendDataFile(rewrittenFile)
+ }
+ }
Review Comment:
can we parallelize this at all?
##########
table/table_test.go:
##########
@@ -368,7 +368,7 @@ func (t *TableWritingTestSuite) createTable(identifier
table.Identifier, formatV
func(ctx context.Context) (iceio.IO, error) {
return iceio.LocalFS{}, nil
},
- nil,
+ &mockedCatalog{meta},
Review Comment:
the mockedCatalog is necessary for overwrite? We should add a comment
explaining that if that's the case
##########
table/transaction.go:
##########
@@ -499,6 +499,282 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps
iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ for _, manifest := range manifests {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
manifest: %w", err)
Review Comment:
can we parallelize this at all?
##########
table/transaction.go:
##########
@@ -499,6 +499,282 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps
iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ for _, manifest := range manifests {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
manifest: %w", err)
+ }
+ if !match {
+ continue
+ }
+ }
+
+ entries, err := manifest.FetchEntries(fs, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to fetch manifest
entries: %w", err)
+ }
+
+ for _, entry := range entries {
+ if entry.Status() == iceberg.EntryStatusDELETED {
+ continue
+ }
+
+ df := entry.DataFile()
+ if df.ContentType() != iceberg.EntryContentData {
+ continue
+ }
+
+ inclusive, err := inclusiveEvaluator(df)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+ }
+
+ if !inclusive {
+ continue
+ }
+
+ strict, err := strictEvaluator(df)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
data file %s with strict evaluator: %w", df.FilePath(), err)
+ }
+
+ if strict {
+ filesToDelete = append(filesToDelete, df)
+ } else {
+ filesToRewrite = append(filesToRewrite, df)
+ }
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression) error {
+ complementFilter := iceberg.NewNot(filter)
+
+ // Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
+ rewriteUUID := uuid.New()
+ for _, originalFile := range files {
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID)
+ if err != nil {
+ return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
+ }
+
+ updater.deleteDataFile(originalFile)
+ for _, rewrittenFile := range rewrittenFiles {
+ updater.appendDataFile(rewrittenFile)
+ }
+ }
+
+ return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID
uuid.UUID) ([]iceberg.DataFile, error) {
+ scanTask := &FileScanTask{
+ File: originalFile,
+ Start: 0,
+ Length: originalFile.FileSizeBytes(),
+ }
+
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
true)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+
+ scanner := &arrowScan{
+ metadata: meta,
+ fs: fs,
+ projectedSchema: t.meta.CurrentSchema(),
+ boundRowFilter: boundFilter,
+ caseSensitive: true,
+ rowLimit: -1, // No limit
+ concurrency: 1,
+ }
+
+ _, recordIter, err := scanner.GetRecords(ctx, []FileScanTask{*scanTask})
+ if err != nil {
+ return nil, fmt.Errorf("failed to get records from original
file: %w", err)
+ }
+
+ var records []arrow.RecordBatch
+ for record, err := range recordIter {
+ if err != nil {
+ return nil, fmt.Errorf("failed to read record: %w", err)
+ }
+ records = append(records, record)
+ }
+
+ // If no records remain after filtering, don't create any new files
+ // we shouldn't hit this case given that we only run this after
determining this is a file to rewrite
Review Comment:
you're correct to handle this, but we can definitely hit this case given
that the logic originally uses the stats to make decisions, so it's entirely
possible (though rare) that we can end up here. the comment should reflect this
##########
table/transaction.go:
##########
@@ -499,6 +499,282 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// OverwriteTable overwrites the table data using an Arrow Table, optionally
with a filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) OverwriteTable(ctx context.Context, tbl arrow.Table,
batchSize int64, filter iceberg.BooleanExpression, caseSensitive bool,
snapshotProps iceberg.Properties) error {
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+
+ return t.Overwrite(ctx, rdr, filter, caseSensitive, snapshotProps)
+}
+
+// Overwrite overwrites the table data using a RecordReader, optionally with a
filter.
+// If filter is nil or AlwaysTrue, all existing data will be replaced.
+// If filter is provided, only data matching the filter will be replaced.
+func (t *Transaction) Overwrite(ctx context.Context, rdr array.RecordReader,
filter iceberg.BooleanExpression, caseSensitive bool, snapshotProps
iceberg.Properties) error {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
+
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter)
+ if err != nil {
+ return err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(filesToRewrite) > 0 {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter); err != nil {
+ return err
+ }
+ }
+
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ s := t.meta.currentSnapshot()
+ if s == nil {
+ return nil, nil, nil
+ }
+
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ for df, err := range s.dataFiles(fs, nil) {
+ if err != nil {
+ return nil, nil, err
+ }
+ if df.ContentType() == iceberg.EntryContentData {
+ filesToDelete = append(filesToDelete, df)
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+ }
+
+ return t.classifyFilesForFilteredOverwrite(ctx, fs, filter)
+}
+
+// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+// Returns files to delete completely, files to rewrite partially, and any
error.
+func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression) (filesToDelete, filesToRewrite
[]iceberg.DataFile, err error) {
+ schema := t.meta.CurrentSchema()
+
+ inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
true, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ }
+
+ strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, true,
false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ }
+
+ var manifestEval func(iceberg.ManifestFile) (bool, error)
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ manifestEval, err = newManifestEvaluator(spec, schema, filter,
true)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
+ }
+ }
+
+ s := t.meta.currentSnapshot()
+ manifests, err := s.Manifests(fs)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get manifests: %w", err)
+ }
+
+ for _, manifest := range manifests {
+ if manifestEval != nil {
+ match, err := manifestEval(manifest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
manifest: %w", err)
+ }
+ if !match {
+ continue
+ }
+ }
+
+ entries, err := manifest.FetchEntries(fs, false)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to fetch manifest
entries: %w", err)
+ }
+
+ for _, entry := range entries {
+ if entry.Status() == iceberg.EntryStatusDELETED {
+ continue
+ }
+
+ df := entry.DataFile()
+ if df.ContentType() != iceberg.EntryContentData {
+ continue
+ }
+
+ inclusive, err := inclusiveEvaluator(df)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
data file %s with inclusive evaluator: %w", df.FilePath(), err)
+ }
+
+ if !inclusive {
+ continue
+ }
+
+ strict, err := strictEvaluator(df)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to evaluate
data file %s with strict evaluator: %w", df.FilePath(), err)
+ }
+
+ if strict {
+ filesToDelete = append(filesToDelete, df)
+ } else {
+ filesToRewrite = append(filesToRewrite, df)
+ }
+ }
+ }
+
+ return filesToDelete, filesToRewrite, nil
+}
+
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression) error {
+ complementFilter := iceberg.NewNot(filter)
+
+ // Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
+ rewriteUUID := uuid.New()
+ for _, originalFile := range files {
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, rewriteUUID)
+ if err != nil {
+ return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
+ }
+
+ updater.deleteDataFile(originalFile)
+ for _, rewrittenFile := range rewrittenFiles {
+ updater.appendDataFile(rewrittenFile)
+ }
+ }
+
+ return nil
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, commitUUID
uuid.UUID) ([]iceberg.DataFile, error) {
+ scanTask := &FileScanTask{
+ File: originalFile,
+ Start: 0,
+ Length: originalFile.FileSizeBytes(),
+ }
+
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
true)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+
+ scanner := &arrowScan{
+ metadata: meta,
+ fs: fs,
+ projectedSchema: t.meta.CurrentSchema(),
+ boundRowFilter: boundFilter,
+ caseSensitive: true,
+ rowLimit: -1, // No limit
+ concurrency: 1,
Review Comment:
we probably want more than concurrency 1 and should allow the caller to
specify it somehow
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]