liucao-dd opened a new pull request, #16795:
URL: https://github.com/apache/iceberg/pull/16795
Part 1 work of #16785.
## What
This adds scoped replacement writes to the Iceberg Spark 4.1 extensions:
```sql
INSERT INTO target_table
REPLACE USING (scope_col_1, scope_col_2)
<query>
```
The command treats the source as the complete replacement for the scopes it
touches. It reads the distinct `REPLACE USING` scope tuples from the source,
deletes target rows whose scope shows up in the source, appends every source
row once, and commits the delete and append as a single Iceberg snapshot.
Scopes that the source never mentions are left alone. This is the "full group
refresh" operation from #16785: atomic, with no two-commit DELETE + INSERT and
no stretched `MERGE INTO ... WHEN NOT MATCHED BY SOURCE`.
## Scope of this PR
This PR covers part of #16785. Two things are left for later on purpose:
- Only the column-list form (`REPLACE USING (cols)`) is implemented. The
arbitrary-condition form (`REPLACE ON <expr>`) from the issue is not here yet.
- There is no native Spark grammar. Spark's `insertInto` grammar does not
accept `REPLACE USING`, and the Iceberg extension grammar cannot own an
arbitrary trailing Spark query. So this PR uses an in-place text split as a
stopgap: the Iceberg ANTLR grammar parses only the command head
(`singleScopedReplaceHead`: `INSERT INTO t REPLACE USING (cols)`), and the
remaining query tail goes to the wrapped Spark parser. Detection is anchored to
the statement head with literals and comments masked, so a `REPLACE USING (`
later in the query body (say, a join alias) does not trigger the path. The hack
is only here to demonstrate the semantics. The end state I want is to propose
`REPLACE USING` / `REPLACE ON` for Spark's `insertInto` grammar and build the
logical plan from the native parse tree instead. There is a `TODO` to that
effect in `IcebergSparkSqlExtensionsParser`.
## How it works
- A new logical node, `ReplaceScopedData(table, scopeColumns, source)`.
- `RewriteScopedReplace` lowers it through Iceberg's existing row-level
write path. It requests the operation as `MERGE` so the table configuration
decides between copy-on-write and merge-on-read (and deletion vectors):
- COW → `ReplaceData`
- MOR → `WriteDelta`
- It reuses the standard row-level group filtering and runtime filtering, so
a tightly scoped replace stays scoped instead of scanning the whole target.
## Tests
- `TestReplaceScopedData` covers the shared semantics: scope deletion, full
source append, untouched scopes, empty source, duplicate scope keys, null scope
values, and multi-column scopes.
- `TestCopyOnWriteReplaceScopedData` and `TestMergeOnReadReplaceScopedData`
exercise both write modes.
## Docs
- `docs/docs/spark-writes.md` documents the supported `REPLACE USING` form
and notes that `REPLACE ON` and the native grammar are future work.
## Follow-ups
1. Propose native Spark grammar for `REPLACE USING` / `REPLACE ON` and
remove the text-split workaround.
2. Implement the `REPLACE ON <boolean_expr>` form.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]