laskoviymishka commented on code in PR #482: URL: https://github.com/apache/iceberg-go/pull/482#discussion_r2194537734
########## table/transaction.go: ########## @@ -343,6 +345,250 @@ func (t *Transaction) AddFiles(ctx context.Context, files []string, snapshotProp return t.apply(updates, reqs) } +// DynamicPartitionOverwrite performs a dynamic partition overwrite operation. +// It detects partition values in the provided arrow table using the current +// partition spec, deletes existing partitions matching these values, and then +// appends the new data. +func (t *Transaction) DynamicPartitionOverwrite(ctx context.Context, tbl arrow.Table, batchSize int64, snapshotProps iceberg.Properties) error { + if t.meta.CurrentSpec().IsUnpartitioned() { + return fmt.Errorf("%w: cannot apply dynamic overwrite on an unpartitioned table", ErrInvalidOperation) + } + + // Check that all partition fields use identity transforms + currentSpec := t.meta.CurrentSpec() + for field := range currentSpec.Fields() { + if _, ok := field.Transform.(iceberg.IdentityTransform); !ok { + return fmt.Errorf("%w: dynamic overwrite does not support non-identity-transform fields in partition spec: %s", + ErrInvalidOperation, field.Name) + } + } + + if tbl.NumRows() == 0 { + return nil + } + + fs, err := t.tbl.fsF(ctx) + if err != nil { + return err + } + + commitUUID := uuid.New() + rdr := array.NewTableReader(tbl, batchSize) + defer rdr.Release() + dataFiles := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, recordWritingArgs{ + sc: tbl.Schema(), + itr: array.IterFromReader(rdr), + fs: fs.(io.WriteFileIO), + writeUUID: &commitUUID, + }) + + var allDataFiles []iceberg.DataFile + for df, err := range dataFiles { + if err != nil { + return err + } + allDataFiles = append(allDataFiles, df) + } + + partitionsToOverwrite := make(map[string]struct{}) + for _, df := range allDataFiles { + partitionKey := fmt.Sprintf("%v", df.Partition()) + partitionsToOverwrite[partitionKey] = struct{}{} + } + + deleteFilter := t.buildPartitionPredicate(partitionsToOverwrite) + + if err := t.Delete(ctx, deleteFilter, snapshotProps); err != nil { + return err + } + + appendFiles := t.appendSnapshotProducer(fs, snapshotProps) + for _, df := range allDataFiles { + appendFiles.appendDataFile(df) + } + + updates, reqs, err := appendFiles.commit() + if err != nil { + return err + } + + return t.apply(updates, reqs) +} + +// Delete performs a delete operation with the given filter and snapshot properties. +func (t *Transaction) Delete(ctx context.Context, filter iceberg.BooleanExpression, snapshotProps iceberg.Properties) error { Review Comment: i'm a little bit doubting here, DPO suppose to delete the whole file inside a partition, this Delete method semantic is odd, per description i would guess that this should delete rows based on predicate, but in a fact it deletes whole files. maybe it worth to rename method and make it private? is there any need to keep it public? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org