wombatu-kun opened a new pull request, #16790:
URL: https://github.com/apache/iceberg/pull/16790
`RowDataProjection` projects a Flink `RowData` to a subset schema. When a
projected field is itself a struct, the field getter rebuilt a whole nested
`RowDataProjection` on every row:
```java
RowData nestedRow = row.getRow(position, nestedRowType.getFieldCount());
return RowDataProjection.create(
nestedRowType, rowField.type().asStructType(),
projectField.type().asStructType())
.wrap(nestedRow);
```
Each call allocates the nested projection's field-id-to-position map, its
field-getter array, and the recursively built child getters, so every row with
a projected nested struct pays for the full projection setup.
This builds the nested projection once when the parent projection is
constructed and reuses it per row, the same way core's `StructProjection`
already caches its nested projections. The reuse is consistent with the
existing contract: the top-level projection is already reused across rows via
`wrap`, and projected rows are consumed before the next row is wrapped.
### When this runs
There are two production callers of `RowDataProjection`, and the
nested-struct branch is taken only when a nested struct is present in the
projected or equality-key schema:
- **Source reads** (`RowDataFileScanTaskReader`): reading a table that has
equality deletes (the typical Flink upsert/CDC output) while projecting a
nested struct column, where the projection does not already include every
column the deletes require. The reader then projects each row to drop the extra
delete-applying columns, and the projected struct rebuilds its nested
projection per row.
- **Upsert sinks** (`BaseDeltaTaskWriter`): when the equality / identifier
fields include a nested sub-field. The delete-key schema is built with
`TypeUtil.select`, which carries the enclosing struct, so the per-row
`keyProjection.wrap(row)` projects that struct.
Flat-schema tables, and reads without deletes, never take this branch (they
use native format projection or primitive getters), so they are unaffected. The
change moves one allocation from per-row to once-per-projection and is
identical across the supported Flink versions, so it is applied to v1.20, v2.0,
and v2.1 in this PR.
### Benchmark
JMH microbenchmark (JDK 17, `-prof gc`). Source schema `id: long, location:
struct<lat: double, lon: double, name: string>, ts: long`; projected schema
`location: struct<lat: double, lon: double>` (the `name` subfield is dropped so
the struct is projected rather than passed through). Each invocation wraps a
pre-built row, reads the projected `location` struct, and reads its two
`double` subfields.
| Benchmark | Metric | Before | After | Delta |
| --- | --- | --- | --- | --- |
| projectNestedStruct | time | 121.95 ns/op | 23.45 ns/op | -80.8% |
| projectNestedStruct | alloc | 448.0 B/op | 48.0 B/op | -89.3% |
Existing `TestRowDataProjection` coverage passes unchanged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]