liucao-dd opened a new issue, #16785: URL: https://github.com/apache/iceberg/issues/16785
### Feature Request / Improvement ## Summary I'd like Iceberg's Spark integration to support scoped replacement writes. The operation would delete existing target rows whose replacement scope appears in the source, append the full source, and commit both parts as one Iceberg snapshot. One possible SQL shape: ```sql INSERT INTO target_table REPLACE USING (scope_col_1, scope_col_2) SELECT ... ``` The exact syntax is less important than the behavior. Spark users need a way to say: this source is the complete replacement for the scopes it touches. ## Problem Some ingestion jobs produce complete groups of rows, not individual row changes. A job might recompute all rows for a customer, tenant, campaign, day, or other business scope. When a scope appears in the source, the desired table state is: 1. Remove all existing target rows in that scope. 2. Write exactly the source rows for that scope. 3. Leave scopes that are absent from the source alone. That is different from a normal `MERGE INTO`. `MERGE INTO` can update matching rows and insert new rows, but it does not directly express "delete target rows in the touched scope that are absent from the source" while also inserting every source row once. That case matters when the source is a full replacement for a group and the new group has fewer rows than the old group. You can approximate this with `WHEN NOT MATCHED BY SOURCE ... THEN DELETE`, but that path has a separate performance problem in Spark. Target-side predicates from that clause are not always pushed down, so a tightly scoped refresh can still turn into a full target-table scan. This is the issue described in [apache/iceberg#11248](https://github.com/apache/iceberg/issues/11248). [apache/spark#52185](https://github.com/apache/spark/pull/52185) proposed pushing `WHEN NOT MATCHED BY SOURCE` predicates to the target because the current behavior can be too expensive for practical use. Partition overwrite is only a partial answer. It works when the replacement scope maps cleanly to Iceberg partitions. Many useful scopes are business keys or multi-column logical groups, not physical partitions. Even when the scope is partition-like, users should not have to reimplement Iceberg file rewrite behavior outside the Spark write path. ## Current options The straightforward SQL workaround is two transactions: ```sql DELETE FROM target_table WHERE (scope_col_1, scope_col_2) IN (SELECT scope_col_1, scope_col_2 FROM source_view); INSERT INTO target_table SELECT * FROM source_view; ``` That has real drawbacks: - The replacement is not atomic. Readers can see the delete before the insert. - Concurrent writers have a larger conflict window across two commits. - The source may need to be materialized outside the query so the delete scope and inserted rows come from the same source evaluation. - Duplicate source keys, empty sources, null scope values, and concurrent writes are easy to handle incorrectly. In my internal use case, I worked around this by dropping below Spark SQL and using Iceberg's low-level rewrite APIs from application code. The workaround has the shape of an Iceberg rewrite action: 1. Write the replacement rows through Spark. 2. Use `SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID` so the new `DataFile` references are staged in `FileRewriteCoordinator`. 3. Commit an `OverwriteFiles` operation that deletes the existing files for the touched scope and adds the staged replacement files. The staging write uses this public Iceberg mechanism: ```java data.writeTo(tablePath) .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetId) .append(); ``` The application then commits the file replacement manually: ```java OverwriteFiles overwrite = icebergTable.newOverwrite(); for (DataFile file : existingFiles) { overwrite.deleteFile(file); } for (DataFile file : newFiles) { overwrite.addFile(file); } if (currentSnapshot != null) { overwrite.validateFromSnapshot(currentSnapshot.snapshotId()); overwrite.conflictDetectionFilter(partitionFilter); overwrite.validateNoConflictingDeletes(); } overwrite.commit(); ``` This gives me a single snapshot, but it is not an API I would expect most Spark users to build correctly. The application has to stage files, preserve residual rows from rewritten files, wire conflict detection, and restrict the operation to the cases it can model safely. ## Proposed semantics Add a Spark write command for scoped replacement: ```sql INSERT INTO target_table REPLACE USING (col1 [, col2 ...]) <query> ``` The command would: - evaluate the source query as the replacement data; - compute the touched scope from the distinct `REPLACE USING` columns in the source; - delete target rows whose scope matches the source scope; - append all source rows once; - commit the delete and append in one Iceberg snapshot; - leave target scopes not present in the source unchanged. An arbitrary-condition form could come later: ```sql INSERT INTO target_table REPLACE ON <boolean_expr> <query> ``` The column-list form is enough for the common group refresh case and gives Spark users a clear way to express the intent. ## Rationale Delta Lake's `replaceUsing` and `replaceOn` are useful prior art. I am not asking Iceberg to match Delta's API. I am asking for the same underlying operation to be expressible in Iceberg Spark: one atomic replacement for logical scopes touched by the source. This should run through Iceberg's Spark write path. Then table configuration can still decide copy-on-write, merge-on-read, and deletion-vector behavior, and users do not have to choose between a two-commit delete plus insert, a stretched `MERGE INTO`, or hand-written rewrite code with conflict validation. ### Query engine Spark ### Willingness to contribute - [x] I can contribute this improvement/feature independently - [x] I would be willing to contribute this improvement/feature with guidance from the Iceberg community - [ ] I cannot contribute this improvement/feature at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
