rayhondo opened a new pull request, #16249:
URL: https://github.com/apache/iceberg/pull/16249

   ## Problem
   
   `TableMetadataCache.schema()` in `DynamicIcebergSink` iterates **all 
historical table schemas** when looking for a write target, returning on the 
first `SAME` match from `CompareSchemasVisitor`. Because 
`CompareSchemasVisitor` matches by name+type (not field IDs), and because older 
schemas have lower IDs that are iterated first, the sink deterministically uses 
a historical schema's field IDs even when the current table schema has evolved 
past it.
   
   ## Reproduction
   
   1. Create an Iceberg table with schema `[id, data, extra]` (e.g. a full 
Debezium CDC envelope with `op`, `after`, `before`, `source`, `ts_ms`, 
`transaction`).
   2. Delete columns: `table.updateSchema().deleteColumn("extra").commit()`.
   3. Send a `DynamicRecord` whose schema has the same structure as the 
original (step 1).
   4. `TableMetadataCache` finds schema id=0 (historical, full) as `SAME` and 
uses its field IDs.
   5. Data is written with old field IDs; the current schema (without `extra`) 
cannot resolve them on read → null.
   
   ## Impact
   
   Silent data corruption. No exception is thrown. Writes "succeed" but 
affected columns return null on read indefinitely. Affects any table where 
columns were deleted after initial creation when an upstream producer continues 
to emit the pre-delete shape.
   
   ## Root cause
   
   `TableMetadataCache.schema()` (`flink/v2.0/.../TableMetadataCache.java`, 
`flink/v2.1/...` identical):
   
   ```java
   for (Map.Entry<Integer, Schema> tableSchema : 
cached.tableSchemas.entrySet()) {
       Result result = CompareSchemasVisitor.visit(input, 
tableSchema.getValue(), ...);
       if (result == SAME) {
           return new ResolvedSchemaInfo(tableSchema.getValue(), SAME, 
identity());
       }
       ...
   }
   ```
   
   `tableSchemas` is populated from `table.schemas()` (all historical schemas, 
not just current).
   
   ## Fix
   
   Resolve only against the current schema. `CacheItem` now tracks 
`currentSchemaId` populated from `table.schema().schemaId()` in `update()`. The 
historical iteration is removed — writing with a historical schema's field IDs 
is always wrong when the current schema has evolved.
   
   ```java
   Schema currentSchema = cached.tableSchemas.get(cached.currentSchemaId);
   if (currentSchema != null) {
       Result result = CompareSchemasVisitor.visit(input, currentSchema, 
caseSensitive, dropUnusedColumns);
       if (result == SAME) return new ResolvedSchemaInfo(currentSchema, SAME, 
identity());
       if (result == DATA_CONVERSION_NEEDED) compatible = currentSchema;
       // SCHEMA_UPDATE_NEEDED falls through to evolution as before
   }
   ```
   
   Applied identically to `flink/v2.0` and `flink/v2.1`. Not applied to 
`flink/v1.20` in this PR (happy to add if reviewers prefer one PR; the bug 
exists there too).
   
   ## Tests
   
   Added `testHistoricalSchemaDoesNotShadowCurrentAfterColumnDelete` to 
`TestTableMetadataCache` in both v2.0 and v2.1. Creates a table with full 
schema, deletes a column, then asserts that submitting the original 
(pre-delete) schema returns `NOT_FOUND` (i.e. triggers proper schema evolution) 
rather than `SAME` against the historical schema.
   
   ## Behavior change
   
   Previously, an input schema structurally identical to *any* historical 
schema resolved as `SAME`. Now only the current schema can produce `SAME`. The 
previous behavior was the bug; any caller depending on it was producing corrupt 
writes.
   
   ## Versions
   
   Confirmed in 1.10.0 and 1.10.1; `TableMetadataCache` logic is identical. Not 
fixed in any released version as of this PR.
   
   ## AI-assisted contribution disclosure
   
   Per the [AI-assisted contributions 
guidelines](https://iceberg.apache.org/contribute/#guidelines-for-ai-assisted-contributions):
 AI assistance was used to draft the patch and tests. The author traced the 
root cause end-to-end against the upstream source, verified the fix matches the 
documented intent of `TableMetadataCache`, and ran the affected module tests + 
spotlessCheck locally. Reviewers may want to focus on whether removing 
historical iteration affects any code path I haven't considered (none found in 
`iceberg-flink`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to