laskoviymishka commented on code in PR #482:
URL: https://github.com/apache/iceberg-go/pull/482#discussion_r2194537734


##########
table/transaction.go:
##########
@@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files 
[]string, snapshotProp
        return t.apply(updates, reqs)
 }
 
+// DynamicPartitionOverwrite performs a dynamic partition overwrite operation.
+// It detects partition values in the provided arrow table using the current
+// partition spec, deletes existing partitions matching these values, and then
+// appends the new data.
+func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl 
arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error {
+       if t.meta.CurrentSpec().IsUnpartitioned() {
+               return fmt.Errorf("%w: cannot apply dynamic overwrite on an 
unpartitioned table", ErrInvalidOperation)
+       }
+
+       // Check that all partition fields use identity transforms
+       currentSpec := t.meta.CurrentSpec()
+       for field := range currentSpec.Fields() {
+               if _, ok := field.Transform.(iceberg.IdentityTransform); !ok {
+                       return fmt.Errorf("%w: dynamic overwrite does not 
support non-identity-transform fields in partition spec: %s",
+                               ErrInvalidOperation, field.Name)
+               }
+       }
+
+       if tbl.NumRows() == 0 {
+               return nil
+       }
+
+       fs, err := t.tbl.fsF(ctx)
+       if err != nil {
+               return err
+       }
+
+       commitUUID := uuid.New()
+       rdr := array.NewTableReader(tbl, batchSize)
+       defer rdr.Release()
+       dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
+               sc:        tbl.Schema(),
+               itr:       array.IterFromReader(rdr),
+               fs:        fs.(io.WriteFileIO),
+               writeUUID: &commitUUID,
+       })
+
+       var allDataFiles []iceberg.DataFile
+       for df, err := range dataFiles {
+               if err != nil {
+                       return err
+               }
+               allDataFiles = append(allDataFiles, df)
+       }
+
+       partitionsToOverwrite := make(map[string]struct{})
+       for _, df := range allDataFiles {
+               partitionKey := fmt.Sprintf("%v", df.Partition())
+               partitionsToOverwrite[partitionKey] = struct{}{}
+       }
+
+       deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite)
+
+       if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil {
+               return err
+       }
+
+       appendFiles := t.appendSnapshotProducer(fs, snapshotProps)
+       for _, df := range allDataFiles {
+               appendFiles.appendDataFile(df)
+       }
+
+       updates, reqs, err := appendFiles.commit()
+       if err != nil {
+               return err
+       }
+
+       return t.apply(updates, reqs)
+}
+
+// Delete performs a delete operation with the given filter and snapshot 
properties.
+func (t *Transaction) Delete(ctx context.Context, filter 
iceberg.BooleanExpression, snapshotProps iceberg.Properties) error {

Review Comment:
   i'm a little bit doubting here, DPO suppose to delete the whole file inside 
a partition, this Delete method semantic is odd, per description i would guess 
that this should delete rows based on predicate, but in a fact it deletes whole 
files.
   
   maybe it worth to rename method and make it private?
   is there any need to keep it public?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to