kzajaczkowski opened a new issue, #16212: URL: https://github.com/apache/iceberg/issues/16212
## Summary `iceberg.tables.evolve-schema-enabled: true` cannot rescue an **empty-struct → non-empty-struct** migration that arises from Protobuf `oneof` arms that were initially zero-field marker messages. When a new schema version adds fields to those arms, the Parquet writer opens against the stale table schema (which still contains `struct<>` columns) and immediately throws `InvalidSchemaException`. The task is killed; data is lost. This is distinct from [#15395](https://github.com/apache/iceberg/issues/15395), which describes a **silent drop** (no evolution, no crash) for nested structs that were never empty. The failure here is louder — the task transitions to `FAILED` at the very first record of the new schema version — and the root cause is different. --- ## Version - `iceberg-kafka-connect` 1.9.2 (also reproduced on the 1.9.2 connector against `iceberg-core` 1.9.2, `parquet-column` 1.15.2) - Confluent `cp-kafka-connect-base` 7.6.6 (Apache Kafka Connect 3.7.x) - Schema Registry: Confluent 7.6.6 - Catalog: Iceberg REST (Nessie 0.103.3 + MinIO) --- ## Proto Shape (Minimum Reproduction) **V1** — zero-field marker arms: ```protobuf syntax = "proto3"; package example.v1; import "google/protobuf/timestamp.proto"; message CardPINEvent { string card_id = 1; google.protobuf.Timestamp event_timestamp = 2; oneof event { CardPINBlockedEvent blocked = 3; CardPINUnblockedEvent unblocked = 4; CardPINChangedEvent changed = 5; } } message CardPINBlockedEvent {} message CardPINUnblockedEvent {} message CardPINChangedEvent {} ``` **V2** — Timestamp field added to each arm (BACKWARD-compatible per SR): ```protobuf message CardPINBlockedEvent { google.protobuf.Timestamp blocked_timestamp = 1; } message CardPINUnblockedEvent { google.protobuf.Timestamp unblocked_timestamp = 1; } message CardPINChangedEvent { google.protobuf.Timestamp changed_timestamp = 1; } ``` --- ## Connector Config ```json { "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": "1", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.control.commit.interval-ms": "5000", "value.converter": "io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } ``` --- ## Reproduction Steps 1. Register V1 schema; produce **3** V1 records (deliberately below the flush threshold of ~40). The sink auto-creates the Iceberg table: ``` event_0 struct< blocked: struct<> ← empty arm unblocked: struct<> ← empty arm changed: struct<> ← empty arm > ``` The coordinator emits `committed to 0 table(s)` — table exists with empty-arm schema but no Parquet data is written yet (silent-drop state). 2. Produce **50** V2 records (total = 53, crosses the flush threshold). Each record sets only the `unblocked` arm: ```json {"cardId":"se-1","eventTimestamp":"2026-04-24T11:00:00Z","unblocked":{"unblockedTimestamp":"2026-04-24T11:00:00Z"},"reason":"PIN unblocked"} ``` 3. Within seconds the task transitions to `FAILED`. --- ## Observed Behaviour Task FAILED at offset 3 (the very first V2 record): ``` Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: example.card_pin_event.v1, partition: 0, offset: 3 Caused by: org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: optional group blocked = 2 { } at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23) at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:458) at org.apache.iceberg.parquet.ParquetWriter.ensureWriterInitialized(ParquetWriter.java:114) at org.apache.iceberg.parquet.ParquetWriter.flushRowGroup(ParquetWriter.java:214) at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:258) at org.apache.iceberg.io.PartitionedFanoutWriter.close(PartitionedFanoutWriter.java:70) at org.apache.iceberg.io.BaseTaskWriter.complete(BaseTaskWriter.java:100) at org.apache.iceberg.connect.data.IcebergWriter.flush(IcebergWriter.java:103) at org.apache.iceberg.connect.data.IcebergWriter.complete(IcebergWriter.java:118) at org.apache.iceberg.connect.data.SinkWriter.lambda$completeWrite$0(SinkWriter.java:57) ``` No schema-evolution log line (`Schema for table … updated`) appears before the failure. The Parquet writer is opened against the stale V1 table schema **before** any evolution is attempted. --- ## Expected Behaviour `evolve-schema-enabled: true` should promote all empty-arm structs to their new non-empty shape before the Parquet writer is initialised, so the write succeeds without manual intervention. --- ## Root Cause Analysis The bug is in `RecordConverter.convertStructValue()`. When the sink processes a `SinkRecord`, it calls the schema-update consumer (`addColumn`) only for struct fields whose value is **non-null in the current record**: ```java // RecordConverter.convertStructValue() — simplified for (Field recordField : recordSchema.fields()) { Object value = struct.get(recordField); if (value == null) { continue; // ← null field: no addColumn callback fired } // schema-update path runs only here ... schemaUpdateConsumer.addColumn(path, name, icebergType); } ``` In a Protobuf `oneof`, the Confluent ProtobufConverter wraps all arms in a synthetic outer struct (`event_0: struct<blocked, unblocked, changed>`). The active arm contains a value; all other arms are `null`. Therefore: - Only the **set arm** (`unblocked`) has its sub-fields discovered via `addColumn`. - `blocked` and `changed` are null → no callback → they remain `struct<>` in the table schema. - When the writer reinitialises, it reads the partially-evolved table. Parquet's `TypeUtil.checkValidWriteSchema` rejects the first `struct<>` it encounters — in this run, `optional group blocked = 2 {}`. The Connect schema attached to the `SinkRecord` (`record.valueSchema()`) **does** contain the full V2 type for every arm, including the arms that are null in this record. The fix should use that typed schema information to evolve the column even when the value is null. --- ## Proposed Fix In `RecordConverter.convertStructValue()`, in the `else` branch where `value == null`, check whether the existing table column for that field is a `struct` that is currently empty. If it is, look up the expected Iceberg type from the Connect schema (via `SchemaUtils.toIcebergType(recordField.schema(), config)`) and fire `addColumn` for each sub-field: ```java } else { // value is null — but if the existing column is an empty struct and // the Connect schema has sub-fields, evolve the column now // so the Parquet writer never sees a zero-field group. if (existingColumn != null && existingColumn.type().isStructType() && existingColumn.type().asStructType().fields().isEmpty() && recordField.schema().type() == Schema.Type.STRUCT) { Type newType = SchemaUtils.toIcebergType(recordField.schema(), config); schemaUpdateConsumer.addColumn(path, name, newType); } } ``` This approach requires no value — only the schema — and is therefore safe for any null field, not just oneof arms. --- ## Relationship to #15395 [#15395](https://github.com/apache/iceberg/issues/15395) ("Does not evolve schema for nested fields inside struct/list/map of structs") reports that evolution silently skips sub-fields of a **non-empty** existing struct when new sub-fields are added. That is a silent-drop; the task stays `RUNNING`. This issue is a separate, more severe case: the existing column is a **zero-field struct**, and the failure mode is a task `FAILED` at the first write attempt. Both issues share the same null-skipping code path but trigger different downstream failures. --- ## Workaround `evolve-schema-enabled` alone cannot recover from this state. Operators must either: - Drop and recreate the Iceberg table (losing any metadata snapshot history), or - Manually add the missing sub-fields via `ALTER TABLE … ADD COLUMN` before restarting the connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
