annurahar opened a new pull request, #15396:
URL: https://github.com/apache/iceberg/pull/15396

   Closes #15395
   
   ## Summary
   
   This PR fixes a bug where the Kafka Connect sink does not evolve schema for 
nested fields inside `struct<struct<...>>`, `list<struct<...>>`, or 
`map<string, struct<...>>` when schema evolution is enabled.
   
   ## Problem
   
   When `iceberg.tables.evolve-schema-enabled=true`, the `RecordConverter` only 
checks top-level fields for schema evolution. It does not recursively traverse 
nested fields inside complex types. As a result:
   
   - New nested fields defined in the record schema are silently ignored
   - The Iceberg table schema is never updated to include these nested fields
   - No error or warning is thrown — the connector continues without issue
   
   ### Example Scenario
   
   **Record Schema** (Kafka Connect):
   ```
   orders (
     order_id: int
     customer: struct
       - customer_id: int
       - customer_details: struct (NEW)   <-- Not detected
           - name: string
           - email: string
   )
   ```
   
   **Iceberg Table Schema** (before):
   ```
   orders (
     order_id: int
     customer: struct
       - customer_id: int
   )
   ```
   
   **Expected Behavior**: The `customer_details` nested struct should be added 
to the Iceberg table schema.
   
   **Actual Behavior**: The `customer_details` field is ignored; schema remains 
unchanged.
   
   ## Solution
   
   ### Key Changes
   
   1. **Added `evolveSchema(SinkRecord, SchemaUpdate.Consumer)` method** in 
`RecordConverter`:
      - Separate public method for schema evolution
      - Traverses the entire record schema to detect all schema changes (new 
fields, type updates, optionality changes)
      - Does NOT convert data - only collects schema updates
   
   2. **Simplified `convert(SinkRecord)` method**:
      - Now only handles data conversion
      - No longer takes `SchemaUpdate.Consumer` parameter
   
   3. **Added recursive schema evolution** for nested fields:
      - `evolveSchemaFromStruct()` - Handles Struct records with schema
      - `evolveSchemaFromMap()` - Handles schemaless Map records
      - `evolveFieldSchema()` - Common logic for processing individual fields
      - `evolveNestedSchemaFromStruct()` - Recursively processes nested fields 
in Struct records
      - `evolveNestedSchemaFromMapValue()` - Recursively processes nested 
fields in Map records
      - `evolveMapAsStruct()` - Processes Map values as struct fields
      - `evolveFirstStructElementInList()` - Handles schema evolution for 
`list<struct>`
      - `evolveFirstStructValueInMap()` - Handles schema evolution for 
`map<string, struct>`
   
   4. **Correctly determines parent field names** for nested columns:
      - `struct<struct>` → uses parent struct's field ID
      - `list<struct>` → uses list element's field ID  
      - `map<string, struct>` → uses map value's field ID
   
   ## Configuration
   
   To enable nested schema evolution, the connector must be configured to fetch 
and use the most recent schema version from the Schema Registry, rather than 
relying on the schema embedded in each message. Add the following configuration 
properties:
   
   ```properties
   # Disable embedded schema in messages
   value.converter.schemas.enable=false
   key.converter.schemas.enable=false
   
   # Enable enhanced Avro schema support
   value.converter.enhanced.avro.schema.support=true
   key.converter.enhanced.avro.schema.support=true
   
   # Use latest schema version from Schema Registry
   key.converter.use.latest.version=true
   value.converter.use.latest.version=true
   
   # Enable auto schema evolution
   value.converter.auto.evolve=true
   key.converter.auto.evolve=true
   ```
   
   ## Testing
   
   Added two new test cases:
   
   1. **`testNestedSchemaEvolutionFromStruct`** - Tests nested schema evolution 
using Struct records (with schema)
   2. **`testNestedSchemaEvolutionFromMap`** - Tests nested schema evolution 
using Map records (schemaless)
   
   Both tests verify that:
   - New fields inside nested structs are detected
   - Correct parent field name is set (e.g., `outer.inner`)
   - Correct field type is inferred
   
   All existing tests continue to pass.
   
   ## Checklist
   
   - [x] New fields inside `struct<struct<...>>` are detected and added
   - [x] New fields inside `list<struct<...>>` are detected and added
   - [x] New fields inside `map<string, struct<...>>` are detected and added
   - [x] Type updates for nested primitive fields are detected
   - [x] Optionality changes for nested fields are detected
   - [x] Works for both Struct (with schema) and Map (schemaless) records
   - [x] No breaking changes to existing functionality
   - [x] All tests pass
   - [x] Code formatted with Spotless


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to