wombatu-kun opened a new pull request, #16790:
URL: https://github.com/apache/iceberg/pull/16790

   `RowDataProjection` projects a Flink `RowData` to a subset schema. When a 
projected field is itself a struct, the field getter rebuilt a whole nested 
`RowDataProjection` on every row:
   
   ```java
   RowData nestedRow = row.getRow(position, nestedRowType.getFieldCount());
   return RowDataProjection.create(
           nestedRowType, rowField.type().asStructType(), 
projectField.type().asStructType())
       .wrap(nestedRow);
   ```
   
   Each call allocates the nested projection's field-id-to-position map, its 
field-getter array, and the recursively built child getters, so every row with 
a projected nested struct pays for the full projection setup.
   
   This builds the nested projection once when the parent projection is 
constructed and reuses it per row, the same way core's `StructProjection` 
already caches its nested projections. The reuse is consistent with the 
existing contract: the top-level projection is already reused across rows via 
`wrap`, and projected rows are consumed before the next row is wrapped.
   
   ### When this runs
   
   There are two production callers of `RowDataProjection`, and the 
nested-struct branch is taken only when a nested struct is present in the 
projected or equality-key schema:
   
   - **Source reads** (`RowDataFileScanTaskReader`): reading a table that has 
equality deletes (the typical Flink upsert/CDC output) while projecting a 
nested struct column, where the projection does not already include every 
column the deletes require. The reader then projects each row to drop the extra 
delete-applying columns, and the projected struct rebuilds its nested 
projection per row.
   - **Upsert sinks** (`BaseDeltaTaskWriter`): when the equality / identifier 
fields include a nested sub-field. The delete-key schema is built with 
`TypeUtil.select`, which carries the enclosing struct, so the per-row 
`keyProjection.wrap(row)` projects that struct.
   
   Flat-schema tables, and reads without deletes, never take this branch (they 
use native format projection or primitive getters), so they are unaffected. The 
change moves one allocation from per-row to once-per-projection and is 
identical across the supported Flink versions, so it is applied to v1.20, v2.0, 
and v2.1 in this PR.
   
   ### Benchmark
   
   JMH microbenchmark (JDK 17, `-prof gc`). Source schema `id: long, location: 
struct<lat: double, lon: double, name: string>, ts: long`; projected schema 
`location: struct<lat: double, lon: double>` (the `name` subfield is dropped so 
the struct is projected rather than passed through). Each invocation wraps a 
pre-built row, reads the projected `location` struct, and reads its two 
`double` subfields.
   
   | Benchmark | Metric | Before | After | Delta |
   | --- | --- | --- | --- | --- |
   | projectNestedStruct | time | 121.95 ns/op | 23.45 ns/op | -80.8% |
   | projectNestedStruct | alloc | 448.0 B/op | 48.0 B/op | -89.3% |
   
   Existing `TestRowDataProjection` coverage passes unchanged.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to