liucao-dd opened a new issue, #16785:
URL: https://github.com/apache/iceberg/issues/16785

   ### Feature Request / Improvement
   
   ## Summary
   
   I'd like Iceberg's Spark integration to support scoped replacement writes. 
The operation would delete existing target rows whose replacement scope appears 
in the source, append the full source, and commit both parts as one Iceberg 
snapshot.
   
   One possible SQL shape:
   
   ```sql
   INSERT INTO target_table
   REPLACE USING (scope_col_1, scope_col_2)
   SELECT ...
   ```
   
   The exact syntax is less important than the behavior. Spark users need a way 
to say: this source is the complete replacement for the scopes it touches.
   
   ## Problem
   
   Some ingestion jobs produce complete groups of rows, not individual row 
changes. A job might recompute all rows for a customer, tenant, campaign, day, 
or other business scope. When a scope appears in the source, the desired table 
state is:
   
   1. Remove all existing target rows in that scope.
   2. Write exactly the source rows for that scope.
   3. Leave scopes that are absent from the source alone.
   
   That is different from a normal `MERGE INTO`.
   
   `MERGE INTO` can update matching rows and insert new rows, but it does not 
directly express "delete target rows in the touched scope that are absent from 
the source" while also inserting every source row once. That case matters when 
the source is a full replacement for a group and the new group has fewer rows 
than the old group.
   
   You can approximate this with `WHEN NOT MATCHED BY SOURCE ... THEN DELETE`, 
but that path has a separate performance problem in Spark. Target-side 
predicates from that clause are not always pushed down, so a tightly scoped 
refresh can still turn into a full target-table scan. This is the issue 
described in 
[apache/iceberg#11248](https://github.com/apache/iceberg/issues/11248). 
[apache/spark#52185](https://github.com/apache/spark/pull/52185) proposed 
pushing `WHEN NOT MATCHED BY SOURCE` predicates to the target because the 
current behavior can be too expensive for practical use.
   
   Partition overwrite is only a partial answer. It works when the replacement 
scope maps cleanly to Iceberg partitions. Many useful scopes are business keys 
or multi-column logical groups, not physical partitions. Even when the scope is 
partition-like, users should not have to reimplement Iceberg file rewrite 
behavior outside the Spark write path.
   
   ## Current options
   
   The straightforward SQL workaround is two transactions:
   
   ```sql
   DELETE FROM target_table
   WHERE (scope_col_1, scope_col_2) IN (SELECT scope_col_1, scope_col_2 FROM 
source_view);
   
   INSERT INTO target_table
   SELECT * FROM source_view;
   ```
   
   That has real drawbacks:
   
   - The replacement is not atomic. Readers can see the delete before the 
insert.
   - Concurrent writers have a larger conflict window across two commits.
   - The source may need to be materialized outside the query so the delete 
scope and inserted rows come from the same source evaluation.
   - Duplicate source keys, empty sources, null scope values, and concurrent 
writes are easy to handle incorrectly.
   
   In my internal use case, I worked around this by dropping below Spark SQL 
and using Iceberg's low-level rewrite APIs from application code. The 
workaround has the shape of an Iceberg rewrite action:
   
   1. Write the replacement rows through Spark.
   2. Use `SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID` so the new 
`DataFile` references are staged in `FileRewriteCoordinator`.
   3. Commit an `OverwriteFiles` operation that deletes the existing files for 
the touched scope and adds the staged replacement files.
   
   The staging write uses this public Iceberg mechanism:
   
   ```java
   data.writeTo(tablePath)
       .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetId)
       .append();
   ```
   
   The application then commits the file replacement manually:
   
   ```java
   OverwriteFiles overwrite = icebergTable.newOverwrite();
   
   for (DataFile file : existingFiles) {
     overwrite.deleteFile(file);
   }
   for (DataFile file : newFiles) {
     overwrite.addFile(file);
   }
   
   if (currentSnapshot != null) {
     overwrite.validateFromSnapshot(currentSnapshot.snapshotId());
     overwrite.conflictDetectionFilter(partitionFilter);
     overwrite.validateNoConflictingDeletes();
   }
   
   overwrite.commit();
   ```
   
   This gives me a single snapshot, but it is not an API I would expect most 
Spark users to build correctly. The application has to stage files, preserve 
residual rows from rewritten files, wire conflict detection, and restrict the 
operation to the cases it can model safely.
   
   ## Proposed semantics
   
   Add a Spark write command for scoped replacement:
   
   ```sql
   INSERT INTO target_table
   REPLACE USING (col1 [, col2 ...])
   <query>
   ```
   
   The command would:
   
   - evaluate the source query as the replacement data;
   - compute the touched scope from the distinct `REPLACE USING` columns in the 
source;
   - delete target rows whose scope matches the source scope;
   - append all source rows once;
   - commit the delete and append in one Iceberg snapshot;
   - leave target scopes not present in the source unchanged.
   
   An arbitrary-condition form could come later:
   
   ```sql
   INSERT INTO target_table
   REPLACE ON <boolean_expr>
   <query>
   ```
   
   The column-list form is enough for the common group refresh case and gives 
Spark users a clear way to express the intent.
   
   ## Rationale
   
   Delta Lake's `replaceUsing` and `replaceOn` are useful prior art. I am not 
asking Iceberg to match Delta's API. I am asking for the same underlying 
operation to be expressible in Iceberg Spark: one atomic replacement for 
logical scopes touched by the source.
   
   This should run through Iceberg's Spark write path. Then table configuration 
can still decide copy-on-write, merge-on-read, and deletion-vector behavior, 
and users do not have to choose between a two-commit delete plus insert, a 
stretched `MERGE INTO`, or hand-written rewrite code with conflict validation.
   
   ### Query engine
   
   Spark
   
   ### Willingness to contribute
   
   - [x] I can contribute this improvement/feature independently
   - [x] I would be willing to contribute this improvement/feature with 
guidance from the Iceberg community
   - [ ] I cannot contribute this improvement/feature at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to