lliangyu-lin commented on code in PR #482:
URL: https://github.com/apache/iceberg-go/pull/482#discussion_r2261145024


##########
table/transaction.go:
##########
@@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
        return t.apply(updates, reqs)
 }
 
+// DynamicPartitionOverwrite performs a dynamic partition overwrite operation.
+// It detects partition values in the provided arrow table using the current
+// partition spec, deletes existing partitions matching these values, and then
+// appends the new data.
+func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl 
arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error {
+       if t.meta.CurrentSpec().IsUnpartitioned() {
+               return fmt.Errorf("%w: cannot apply dynamic overwrite on an 
unpartitioned table", ErrInvalidOperation)
+       }
+
+       // Check that all partition fields use identity transforms
+       currentSpec := t.meta.CurrentSpec()
+       for field := range currentSpec.Fields() {
+               if _, ok := field.Transform.(iceberg.IdentityTransform); !ok {
+                       return fmt.Errorf("%w: dynamic overwrite does not 
support non-identity-transform fields in partition spec: %s",
+                               ErrInvalidOperation, field.Name)
+               }
+       }
+
+       if tbl.NumRows() == 0 {
+               return nil
+       }
+
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       commitUUID := uuid.New()
+       rdr := array.NewTableReader(tbl, batchSize)
+       defer rdr.Release()
+       dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
+               sc:        tbl.Schema(),
+               itr:       array.IterFromReader(rdr),
+               fs:        fs.(io.WriteFileIO),
+               writeUUID: &commitUUID,
+       })
+
+       var allDataFiles []iceberg.DataFile
+       for df, err := range dataFiles {
+               if err != nil {
+                       return err
+               }
+               allDataFiles = append(allDataFiles, df)
+       }
+
+       partitionsToOverwrite := make(map[string]struct{})
+       for _, df := range allDataFiles {
+               partitionKey := fmt.Sprintf("%v", df.Partition())
+               partitionsToOverwrite[partitionKey] = struct{}{}
+       }
+
+       deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite)
+
+       if err := t.deleteFileByFilter(ctx, deleteFilter, snapshotProps); err 
!= nil {
+               return err
+       }
+
+       appendFiles := t.appendSnapshotProducer(fs, snapshotProps)
+       for _, df := range allDataFiles {
+               appendFiles.appendDataFile(df)
+       }
+
+       updates, reqs, err := appendFiles.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
+// deleteFileByFilter performs a delete operation with the given filter and 
snapshot properties.
+func (t *Transaction) deleteFileByFilter(ctx context.Context, filter 
iceberg.BooleanExpression, snapshotProps iceberg.Properties) error {

Review Comment:
   I'm also working on a complete delete API (CoW) that can delete row level 
and file level based on predicate in 
https://github.com/apache/iceberg-go/pull/518. 
   Hopefully we don't need this method once the the full delete API is 
supported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to