rdblue commented on code in PR #13219: URL: https://github.com/apache/iceberg/pull/13219#discussion_r2229464159
########## spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java: ########## @@ -605,4 +723,172 @@ public void testRowLineage() throws Exception { record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)), record.copy(Map.of("id", 5L, "data", "e")))); } + + @Test + public void testUnshreddedVariant() throws IOException { + Assumptions.assumeThat(supportsVariant()).as("Variant support is not implemented").isTrue(); + + Schema schema = + new Schema(required(0, "id", LongType.get()), optional(1, "data", Types.VariantType.get())); + + writeAndValidate(schema); + } + + @Test + public void testShreddedVariant() throws IOException { + Assumptions.assumeThat(supportsVariant()).as("Variant support is not implemented").isTrue(); + + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT32)); + GroupType fieldB = + field( + "b", + shreddedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, LogicalTypeAnnotation.stringType())); + GroupType objectFields = objectFields(fieldA, fieldB); + GroupType variantType = variant("var", 2, objectFields); + MessageType parquetSchema = parquetSchema(variantType); + + Record recordA = record(fieldA, Map.of("value", serialize(Variants.ofNull()))); + Record recordB = record(fieldB, Map.of("typed_value", "iceberg")); + Record fields = record(objectFields, Map.of("a", recordA, "b", recordB)); + Record variant = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", fields)); + Record record = record(parquetSchema, Map.of("id", 1, "var", variant)); + InternalRow actual = writeAndRead(icebergSchema(parquetSchema), SCHEMA, record); + + Record expected = GenericRecord.create(SCHEMA); + expected.set(0, 1); + ShreddedObject expectedObject = Variants.object(TEST_METADATA); + expectedObject.put("a", Variants.ofNull()); + expectedObject.put("b", Variants.of("iceberg")); + expected.set(1, Variant.of(TEST_METADATA, expectedObject)); + + GenericsHelpers.assertEqualsUnsafe(SCHEMA.asStruct(), expected, actual); + } + + private static InternalRow writeAndRead(Schema writeSchema, Schema expectedSchema, Record record) + throws IOException { + return Iterables.getOnlyElement(writeAndRead(writeSchema, expectedSchema, List.of(record))); + } + + private static List<InternalRow> writeAndRead( + Schema writeSchema, Schema expectedSchema, List<Record> records) throws IOException { + OutputFile output = new InMemoryOutputFile(); + try (FileAppender<Record> writer = + Parquet.write(output) + .schema(writeSchema) + .createWriterFunc(GenericParquetWriter::create) + .named("test") + .build()) { + writer.addAll(records); + } + + try (CloseableIterable<InternalRow> reader = + Parquet.read(output.toInputFile()) + .project(expectedSchema) + .createReaderFunc( + type -> SparkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT)) + .build()) { + return Lists.newArrayList(reader); + } + } + + private static ByteBuffer serialize(VariantValue value) { + ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + value.writeTo(buffer, 0); + return buffer; + } + + private static Record record(GroupType type, Map<String, Object> fields) { + Record record = org.apache.iceberg.data.GenericRecord.create(icebergSchema(type)); + for (Map.Entry<String, Object> entry : fields.entrySet()) { + record.setField(entry.getKey(), entry.getValue()); + } + return record; + } + + private static Schema icebergSchema(GroupType schema) { + if (schema instanceof MessageType) { + return ParquetSchemaUtil.convert((MessageType) schema); + + } else { + MessageType messageType = + org.apache.parquet.schema.Types.buildMessage() + .addFields(schema.getFields().toArray(new org.apache.parquet.schema.Type[0])) + .named(schema.getName()); + return ParquetSchemaUtil.convert(messageType); + } + } + + private static MessageType parquetSchema(org.apache.parquet.schema.Type variantType) { Review Comment: These static methods are all from `TestVariantReaders` right? Should they be shared? This is quite a bit of test code just to validate that the readers are built correctly using the Parquet variant reader builder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org