nateab opened a new pull request, #27591:
URL: https://github.com/apache/flink/pull/27591
## Summary
- Fixes deserialization failure ("Found MyEnum, expecting union") when Avro
records contain **enum types** and are read via the Table API with Kafka +
Confluent Schema Registry
- Root cause: `RegistryAvroFormatFactory` derives a reader schema from the
Table DDL, but `AvroSchemaConverter` is lossy — enums become `STRING` →
`string`, which Avro's schema resolution can't match against the writer's enum
type in a union
- Fix: when no explicit `avro-confluent.schema` is provided, pass `null` as
the reader schema so the writer schema from the registry is used directly;
`AvroToRowDataConverters` already handles enum→string via `.toString()`
- Explicit user-provided schemas continue to work as before (schema
evolution preserved)
## Changes
### `RegistryAvroFormatFactory.java`
- Changed deserialization path from
`.orElse(AvroSchemaConverter.convertToSchema(rowType))` to `.orElse(null)` —
only when no `avro-confluent.schema` option is set
### `AvroDeserializationSchema.java`
- `checkAvroInitialized()` now handles null `schemaString` (skips
`Schema.Parser().parse()`)
- Skips JSON decoder creation when reader schema is null (binary decoder is
schema-independent)
### `RegistryAvroDeserializationSchema.java`
- Falls back to writer schema when reader schema is null:
`datumReader.setExpected(readerSchema != null ? readerSchema : writerSchema)`
### Tests
- Added `testRowDataReadWithEnumFieldAndNullReaderSchema` in
`RegistryAvroRowDataSeDeSchemaTest` — serializes a `GenericRecord` with enum
field in Confluent wire format, deserializes with null reader schema, verifies
enum values become strings
- Updated `RegistryAvroFormatFactoryTest` to match the new null-schema
behavior
## Why This Is Safe
- `AvroToRowDataConverters` handles enums via `avroObject.toString()` which
works for both `String` and `GenericEnumSymbol`
- `IndexedRecord.get(i)` field access by index works correctly when the
GenericRecord uses the writer schema
- `AvroRowDataDeserializationSchema` provides its own `TypeInformation`
(never calls `getProducedType()` on the inner schema)
- Binary decoder creation is schema-independent
- Serialization path is unchanged
## Test plan
- [x] `RegistryAvroRowDataSeDeSchemaTest` — 5 tests pass (including new enum
test)
- [x] `RegistryAvroFormatFactoryTest` — 6 tests pass
- [x] Full `flink-avro-confluent-registry` module — 28 tests, 0 failures
- [x] Full `flink-avro` module — 337 tests (284 run, 53 skipped), 0 failures
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]