nateab opened a new pull request, #27591:
URL: https://github.com/apache/flink/pull/27591

   ## Summary
   
   - Fixes deserialization failure ("Found MyEnum, expecting union") when Avro 
records contain **enum types** and are read via the Table API with Kafka + 
Confluent Schema Registry
   - Root cause: `RegistryAvroFormatFactory` derives a reader schema from the 
Table DDL, but `AvroSchemaConverter` is lossy — enums become `STRING` → 
`string`, which Avro's schema resolution can't match against the writer's enum 
type in a union
   - Fix: when no explicit `avro-confluent.schema` is provided, pass `null` as 
the reader schema so the writer schema from the registry is used directly; 
`AvroToRowDataConverters` already handles enum→string via `.toString()`
   - Explicit user-provided schemas continue to work as before (schema 
evolution preserved)
   
   ## Changes
   
   ### `RegistryAvroFormatFactory.java`
   - Changed deserialization path from 
`.orElse(AvroSchemaConverter.convertToSchema(rowType))` to `.orElse(null)` — 
only when no `avro-confluent.schema` option is set
   
   ### `AvroDeserializationSchema.java`
   - `checkAvroInitialized()` now handles null `schemaString` (skips 
`Schema.Parser().parse()`)
   - Skips JSON decoder creation when reader schema is null (binary decoder is 
schema-independent)
   
   ### `RegistryAvroDeserializationSchema.java`
   - Falls back to writer schema when reader schema is null: 
`datumReader.setExpected(readerSchema != null ? readerSchema : writerSchema)`
   
   ### Tests
   - Added `testRowDataReadWithEnumFieldAndNullReaderSchema` in 
`RegistryAvroRowDataSeDeSchemaTest` — serializes a `GenericRecord` with enum 
field in Confluent wire format, deserializes with null reader schema, verifies 
enum values become strings
   - Updated `RegistryAvroFormatFactoryTest` to match the new null-schema 
behavior
   
   ## Why This Is Safe
   
   - `AvroToRowDataConverters` handles enums via `avroObject.toString()` which 
works for both `String` and `GenericEnumSymbol`
   - `IndexedRecord.get(i)` field access by index works correctly when the 
GenericRecord uses the writer schema
   - `AvroRowDataDeserializationSchema` provides its own `TypeInformation` 
(never calls `getProducedType()` on the inner schema)
   - Binary decoder creation is schema-independent
   - Serialization path is unchanged
   
   ## Test plan
   
   - [x] `RegistryAvroRowDataSeDeSchemaTest` — 5 tests pass (including new enum 
test)
   - [x] `RegistryAvroFormatFactoryTest` — 6 tests pass
   - [x] Full `flink-avro-confluent-registry` module — 28 tests, 0 failures
   - [x] Full `flink-avro` module — 337 tests (284 run, 53 skipped), 0 failures


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to