rdblue commented on code in PR #13219:
URL: https://github.com/apache/iceberg/pull/13219#discussion_r2229464159


##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java:
##########
@@ -605,4 +723,172 @@ public void testRowLineage() throws Exception {
             record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)),
             record.copy(Map.of("id", 5L, "data", "e"))));
   }
+
+  @Test
+  public void testUnshreddedVariant() throws IOException {
+    Assumptions.assumeThat(supportsVariant()).as("Variant support is not 
implemented").isTrue();
+
+    Schema schema =
+        new Schema(required(0, "id", LongType.get()), optional(1, "data", 
Types.VariantType.get()));
+
+    writeAndValidate(schema);
+  }
+
+  @Test
+  public void testShreddedVariant() throws IOException {
+    Assumptions.assumeThat(supportsVariant()).as("Variant support is not 
implemented").isTrue();
+
+    GroupType fieldA = field("a", 
shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT32));
+    GroupType fieldB =
+        field(
+            "b",
+            shreddedPrimitive(
+                PrimitiveType.PrimitiveTypeName.BINARY, 
LogicalTypeAnnotation.stringType()));
+    GroupType objectFields = objectFields(fieldA, fieldB);
+    GroupType variantType = variant("var", 2, objectFields);
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    Record recordA = record(fieldA, Map.of("value", 
serialize(Variants.ofNull())));
+    Record recordB = record(fieldB, Map.of("typed_value", "iceberg"));
+    Record fields = record(objectFields, Map.of("a", recordA, "b", recordB));
+    Record variant =
+        record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, 
"typed_value", fields));
+    Record record = record(parquetSchema, Map.of("id", 1, "var", variant));
+    InternalRow actual = writeAndRead(icebergSchema(parquetSchema), SCHEMA, 
record);
+
+    Record expected = GenericRecord.create(SCHEMA);
+    expected.set(0, 1);
+    ShreddedObject expectedObject = Variants.object(TEST_METADATA);
+    expectedObject.put("a", Variants.ofNull());
+    expectedObject.put("b", Variants.of("iceberg"));
+    expected.set(1, Variant.of(TEST_METADATA, expectedObject));
+
+    GenericsHelpers.assertEqualsUnsafe(SCHEMA.asStruct(), expected, actual);
+  }
+
+  private static InternalRow writeAndRead(Schema writeSchema, Schema 
expectedSchema, Record record)
+      throws IOException {
+    return Iterables.getOnlyElement(writeAndRead(writeSchema, expectedSchema, 
List.of(record)));
+  }
+
+  private static List<InternalRow> writeAndRead(
+      Schema writeSchema, Schema expectedSchema, List<Record> records) throws 
IOException {
+    OutputFile output = new InMemoryOutputFile();
+    try (FileAppender<Record> writer =
+        Parquet.write(output)
+            .schema(writeSchema)
+            .createWriterFunc(GenericParquetWriter::create)
+            .named("test")
+            .build()) {
+      writer.addAll(records);
+    }
+
+    try (CloseableIterable<InternalRow> reader =
+        Parquet.read(output.toInputFile())
+            .project(expectedSchema)
+            .createReaderFunc(
+                type -> SparkParquetReaders.buildReader(expectedSchema, type, 
ID_TO_CONSTANT))
+            .build()) {
+      return Lists.newArrayList(reader);
+    }
+  }
+
+  private static ByteBuffer serialize(VariantValue value) {
+    ByteBuffer buffer = 
ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
+    value.writeTo(buffer, 0);
+    return buffer;
+  }
+
+  private static Record record(GroupType type, Map<String, Object> fields) {
+    Record record = 
org.apache.iceberg.data.GenericRecord.create(icebergSchema(type));
+    for (Map.Entry<String, Object> entry : fields.entrySet()) {
+      record.setField(entry.getKey(), entry.getValue());
+    }
+    return record;
+  }
+
+  private static Schema icebergSchema(GroupType schema) {
+    if (schema instanceof MessageType) {
+      return ParquetSchemaUtil.convert((MessageType) schema);
+
+    } else {
+      MessageType messageType =
+          org.apache.parquet.schema.Types.buildMessage()
+              .addFields(schema.getFields().toArray(new 
org.apache.parquet.schema.Type[0]))
+              .named(schema.getName());
+      return ParquetSchemaUtil.convert(messageType);
+    }
+  }
+
+  private static MessageType parquetSchema(org.apache.parquet.schema.Type 
variantType) {

Review Comment:
   These static methods are all from `TestVariantReaders` right? Should they be 
shared? This is quite a bit of test code just to validate that the readers are 
built correctly using the Parquet variant reader builder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to