lliangyu-lin commented on code in PR #482:
URL: https://github.com/apache/iceberg-go/pull/482#discussion_r2261145024
##########
table/transaction.go:
##########
@@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files
[]string, snapshotProp
return t.apply(updates, reqs)
}
+// DynamicPartitionOverwrite performs a dynamic partition overwrite operation.
+// It detects partition values in the provided arrow table using the current
+// partition spec, deletes existing partitions matching these values, and then
+// appends the new data.
+func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl
arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error {
+ if t.meta.CurrentSpec().IsUnpartitioned() {
+ return fmt.Errorf("%w: cannot apply dynamic overwrite on an
unpartitioned table", ErrInvalidOperation)
+ }
+
+ // Check that all partition fields use identity transforms
+ currentSpec := t.meta.CurrentSpec()
+ for field := range currentSpec.Fields() {
+ if _, ok := field.Transform.(iceberg.IdentityTransform); !ok {
+ return fmt.Errorf("%w: dynamic overwrite does not
support non-identity-transform fields in partition spec: %s",
+ ErrInvalidOperation, field.Name)
+ }
+ }
+
+ if tbl.NumRows() == 0 {
+ return nil
+ }
+
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return err
+ }
+
+ commitUUID := uuid.New()
+ rdr := array.NewTableReader(tbl, batchSize)
+ defer rdr.Release()
+ dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
+ sc: tbl.Schema(),
+ itr: array.IterFromReader(rdr),
+ fs: fs.(io.WriteFileIO),
+ writeUUID: &commitUUID,
+ })
+
+ var allDataFiles []iceberg.DataFile
+ for df, err := range dataFiles {
+ if err != nil {
+ return err
+ }
+ allDataFiles = append(allDataFiles, df)
+ }
+
+ partitionsToOverwrite := make(map[string]struct{})
+ for _, df := range allDataFiles {
+ partitionKey := fmt.Sprintf("%v", df.Partition())
+ partitionsToOverwrite[partitionKey] = struct{}{}
+ }
+
+ deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite)
+
+ if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err
!= nil {
+ return err
+ }
+
+ appendFiles := t.appendSnapshotProducer(fs, snapshotProps)
+ for _, df := range allDataFiles {
+ appendFiles.appendDataFile(df)
+ }
+
+ updates, reqs, err := appendFiles.commit()
+ if err != nil {
+ return err
+ }
+
+ return t.apply(updates, reqs)
+}
+
+// deleteFileByFilter performs a delete operation with the given filter and
snapshot properties.
+func (t *Transaction) deleteFileByFilter(ctx context.Context, filter
iceberg.BooleanExpression, snapshotProps iceberg.Properties) error {
Review Comment:
I'm also working on a complete delete API (CoW) that can delete row level
and file level based on predicate in
https://github.com/apache/iceberg-go/pull/518.
Hopefully we don't need this method once the the full delete API is
supported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]