annurahar opened a new pull request, #15396:
URL: https://github.com/apache/iceberg/pull/15396
Closes #15395
## Summary
This PR fixes a bug where the Kafka Connect sink does not evolve schema for
nested fields inside `struct<struct<...>>`, `list<struct<...>>`, or
`map<string, struct<...>>` when schema evolution is enabled.
## Problem
When `iceberg.tables.evolve-schema-enabled=true`, the `RecordConverter` only
checks top-level fields for schema evolution. It does not recursively traverse
nested fields inside complex types. As a result:
- New nested fields defined in the record schema are silently ignored
- The Iceberg table schema is never updated to include these nested fields
- No error or warning is thrown — the connector continues without issue
### Example Scenario
**Record Schema** (Kafka Connect):
```
orders (
order_id: int
customer: struct
- customer_id: int
- customer_details: struct (NEW) <-- Not detected
- name: string
- email: string
)
```
**Iceberg Table Schema** (before):
```
orders (
order_id: int
customer: struct
- customer_id: int
)
```
**Expected Behavior**: The `customer_details` nested struct should be added
to the Iceberg table schema.
**Actual Behavior**: The `customer_details` field is ignored; schema remains
unchanged.
## Solution
### Key Changes
1. **Added `evolveSchema(SinkRecord, SchemaUpdate.Consumer)` method** in
`RecordConverter`:
- Separate public method for schema evolution
- Traverses the entire record schema to detect all schema changes (new
fields, type updates, optionality changes)
- Does NOT convert data - only collects schema updates
2. **Simplified `convert(SinkRecord)` method**:
- Now only handles data conversion
- No longer takes `SchemaUpdate.Consumer` parameter
3. **Added recursive schema evolution** for nested fields:
- `evolveSchemaFromStruct()` - Handles Struct records with schema
- `evolveSchemaFromMap()` - Handles schemaless Map records
- `evolveFieldSchema()` - Common logic for processing individual fields
- `evolveNestedSchemaFromStruct()` - Recursively processes nested fields
in Struct records
- `evolveNestedSchemaFromMapValue()` - Recursively processes nested
fields in Map records
- `evolveMapAsStruct()` - Processes Map values as struct fields
- `evolveFirstStructElementInList()` - Handles schema evolution for
`list<struct>`
- `evolveFirstStructValueInMap()` - Handles schema evolution for
`map<string, struct>`
4. **Correctly determines parent field names** for nested columns:
- `struct<struct>` → uses parent struct's field ID
- `list<struct>` → uses list element's field ID
- `map<string, struct>` → uses map value's field ID
## Configuration
To enable nested schema evolution, the connector must be configured to fetch
and use the most recent schema version from the Schema Registry, rather than
relying on the schema embedded in each message. Add the following configuration
properties:
```properties
# Disable embedded schema in messages
value.converter.schemas.enable=false
key.converter.schemas.enable=false
# Enable enhanced Avro schema support
value.converter.enhanced.avro.schema.support=true
key.converter.enhanced.avro.schema.support=true
# Use latest schema version from Schema Registry
key.converter.use.latest.version=true
value.converter.use.latest.version=true
# Enable auto schema evolution
value.converter.auto.evolve=true
key.converter.auto.evolve=true
```
## Testing
Added two new test cases:
1. **`testNestedSchemaEvolutionFromStruct`** - Tests nested schema evolution
using Struct records (with schema)
2. **`testNestedSchemaEvolutionFromMap`** - Tests nested schema evolution
using Map records (schemaless)
Both tests verify that:
- New fields inside nested structs are detected
- Correct parent field name is set (e.g., `outer.inner`)
- Correct field type is inferred
All existing tests continue to pass.
## Checklist
- [x] New fields inside `struct<struct<...>>` are detected and added
- [x] New fields inside `list<struct<...>>` are detected and added
- [x] New fields inside `map<string, struct<...>>` are detected and added
- [x] Type updates for nested primitive fields are detected
- [x] Optionality changes for nested fields are detected
- [x] Works for both Struct (with schema) and Map (schemaless) records
- [x] No breaking changes to existing functionality
- [x] All tests pass
- [x] Code formatted with Spotless
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]