zeroshade commented on code in PR #718:
URL: https://github.com/apache/iceberg-go/pull/718#discussion_r2790623167
##########
table/table_test.go:
##########
@@ -1663,6 +1664,55 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table
overwrite becomes append
}
+// TestDelete verifies that Table.Delete properly delegates to
Transaction.Delete
+func (t *TableWritingTestSuite) TestDelete() {
+ testCases := []struct {
+ name string
+ table *table.Table
+ expectedErr error
+ }{
+ {
+ name: "success with copy-on-write",
+ table: t.createTable(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ t.formatVersion,
+ *iceberg.UnpartitionedSpec,
+ t.tableSchema,
+ ),
+ expectedErr: nil,
+ },
+ {
+ name: "abort on merge-on-read",
+ table: t.createTableWithProps(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ map[string]string{
+ table.PropertyFormatVersion:
strconv.Itoa(t.formatVersion),
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ t.tableSchema,
+ ),
+ expectedErr: errors.New("only 'copy-on-write' is
currently supported"),
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func() {
+ resultTbl, err := tc.table.Delete(t.ctx,
iceberg.EqualTo(iceberg.Reference("foo"), "bar"), nil)
Review Comment:
Can we use the scanner to verify the deletion was correct and validate the
data left in the table?
##########
table/transaction.go:
##########
@@ -595,55 +595,136 @@ func (t *Transaction) Overwrite(ctx context.Context, rdr
array.RecordReader, sna
apply(&overwrite)
}
+ updater, err := t.performCopyOnWriteDeletion(ctx, snapshotProps,
overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
+ if err != nil {
+ return err
+ }
+
fs, err := t.tbl.fsF(ctx)
if err != nil {
return err
}
+ itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: rdr.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &updater.commitUuid,
+ })
+
+ for df, err := range itr {
+ if err != nil {
+ return err
+ }
+ updater.appendDataFile(df)
+ }
+
+ updates, reqs, err := updater.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+func (t *Transaction) performCopyOnWriteDeletion(ctx context.Context,
snapshotProps iceberg.Properties, filter iceberg.BooleanExpression,
caseSensitive bool, concurrency int) (*snapshotProducer, error) {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return nil, err
+ }
if t.meta.NameMapping() == nil {
nameMapping := t.meta.CurrentSchema().NameMapping()
mappingJson, err := json.Marshal(nameMapping)
if err != nil {
- return err
+ return nil, err
}
err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
if err != nil {
- return err
+ return nil, err
}
}
commitUUID := uuid.New()
updater := t.updateSnapshot(fs,
snapshotProps).mergeOverwrite(&commitUUID)
- filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, overwrite.filter, overwrite.caseSensitive, overwrite.concurrency)
+ filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, caseSensitive, concurrency)
if err != nil {
- return err
+ return nil, err
}
for _, df := range filesToDelete {
updater.deleteDataFile(df)
}
if len(filesToRewrite) > 0 {
- if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, overwrite.filter, overwrite.caseSensitive,
overwrite.concurrency); err != nil {
- return err
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, caseSensitive, concurrency); err != nil {
+ return nil, err
}
}
- itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
- sc: rdr.Schema(),
- itr: array.IterFromReader(rdr),
- fs: fs.(io.WriteFileIO),
- writeUUID: &updater.commitUuid,
- })
+ return updater, nil
+}
- for df, err := range itr {
- if err != nil {
- return err
+type DeleteOption func(deleteOp *deleteOperation)
+
+type deleteOperation struct {
+ caseSensitive bool
+ concurrency int
+}
+
+// WithDeleteConcurrency overwrites the default concurrency for delete
operations.
+// Default: runtime.GOMAXPROCS(0)
+func WithDeleteConcurrency(concurrency int) OverwriteOption {
Review Comment:
`DeleteOption`?
##########
table/table_test.go:
##########
@@ -1663,6 +1664,55 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
t.Equal(table.OpAppend, snapshot.Summary.Operation) // Empty table
overwrite becomes append
}
+// TestDelete verifies that Table.Delete properly delegates to
Transaction.Delete
+func (t *TableWritingTestSuite) TestDelete() {
+ testCases := []struct {
+ name string
+ table *table.Table
+ expectedErr error
+ }{
+ {
+ name: "success with copy-on-write",
+ table: t.createTable(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ t.formatVersion,
+ *iceberg.UnpartitionedSpec,
+ t.tableSchema,
+ ),
+ expectedErr: nil,
+ },
+ {
+ name: "abort on merge-on-read",
+ table: t.createTableWithProps(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ map[string]string{
+ table.PropertyFormatVersion:
strconv.Itoa(t.formatVersion),
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ t.tableSchema,
+ ),
+ expectedErr: errors.New("only 'copy-on-write' is
currently supported"),
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func() {
+ resultTbl, err := tc.table.Delete(t.ctx,
iceberg.EqualTo(iceberg.Reference("foo"), "bar"), nil)
+
+ // If an error was expected, check that it's the
correct one and abort validating the operation
+ if tc.expectedErr != nil {
+ t.Require().ErrorContains(err,
tc.expectedErr.Error())
+
+ return
+ }
+
+ snapshot := resultTbl.CurrentSnapshot()
+ t.NotNil(snapshot)
+ t.Equal(table.OpAppend, snapshot.Summary.Operation) //
Empty table overwrite becomes append
Review Comment:
wouldn't this be a Delete operation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]