aihuaxu commented on code in PR #12139: URL: https://github.com/apache/iceberg/pull/12139#discussion_r1958863116
########## core/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java: ########## @@ -27,10 +29,55 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public class VariantTestUtil { private VariantTestUtil() {} + public static void assertEqual(VariantMetadata expected, VariantMetadata actual) { + assertThat(actual).isNotNull(); + assertThat(expected).isNotNull(); + assertThat(actual.dictionarySize()) + .as("Dictionary size should match") + .isEqualTo(expected.dictionarySize()); + + for (int i = 0; i < expected.dictionarySize(); i += 1) { + assertThat(actual.get(i)).isEqualTo(expected.get(i)); + } + } + + public static void assertEqual(VariantValue expected, VariantValue actual) { + assertThat(actual).isNotNull(); + assertThat(expected).isNotNull(); + assertThat(actual.type()).as("Variant type should match").isEqualTo(expected.type()); + + if (expected.type() == PhysicalType.OBJECT) { + VariantObject expectedObject = expected.asObject(); + VariantObject actualObject = actual.asObject(); + assertThat(actualObject.numFields()) + .as("Variant object num fields should match") + .isEqualTo(expectedObject.numFields()); + for (String fieldName : expectedObject.fieldNames()) { + assertEqual(expectedObject.get(fieldName), actualObject.get(fieldName)); + } + Review Comment: nit: remove extra empty line. Not exactly sure if this is expected code style in Iceberg. ########## core/src/test/java/org/apache/iceberg/variants/VariantTestUtil.java: ########## @@ -27,10 +29,55 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public class VariantTestUtil { private VariantTestUtil() {} + public static void assertEqual(VariantMetadata expected, VariantMetadata actual) { + assertThat(actual).isNotNull(); + assertThat(expected).isNotNull(); + assertThat(actual.dictionarySize()) + .as("Dictionary size should match") + .isEqualTo(expected.dictionarySize()); + + for (int i = 0; i < expected.dictionarySize(); i += 1) { + assertThat(actual.get(i)).isEqualTo(expected.get(i)); Review Comment: For this check, if one is sorted and the other is unsorted, then we are considering them different, right? Just want to confirm. ########## parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java: ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; + +public abstract class ParquetVariantVisitor<R> { + static final String METADATA = "metadata"; + static final String VALUE = "value"; + static final String TYPED_VALUE = "typed_value"; + + /** + * Handles the root variant column group. + * + * <p>The value and typed_value results are combined by calling {@link #value}. + * + * <pre> + * group v (VARIANT) { <-- metadata result and combined value and typed_value result + * required binary metadata; + * optional binary value; + * optional ... typed_value; + * } + * </pre> + */ + public R variant(GroupType variant, R metadataResult, R valueResult) { + return null; + } + + /** + * Handles a serialized variant metadata column. + * + * <pre> + * group v (VARIANT) { + * required binary metadata; <-- this column + * optional binary value; + * optional ... typed_value; + * } + * </pre> + */ + public R metadata(PrimitiveType metadata) { + return null; + } + + /** + * Handles a serialized variant value column. + * + * <pre> + * group variant_value_pair { + * optional binary value; <-- this column + * optional ... typed_value; + * } + * </pre> + */ + public R serialized(PrimitiveType value) { + return null; + } + + /** + * Handles a shredded primitive typed_value column. + * + * <pre> + * group variant_value_pair { + * optional binary value; + * optional int32 typed_value; <-- this column when it is any primitive + * } + * </pre> + */ + public R primitive(PrimitiveType primitive) { + return null; + } + + /** + * Handles a variant value result and typed_value result pair. + * + * <p>The value and typed_value pair may be nested in an object field, array element, or in the + * root group of a variant. + * + * <p>This method is also called when the typed_value field is missing. + * + * <pre> + * group variant_value_pair { <-- value result and typed_value result + * optional binary value; + * optional ... typed_value; + * } + * </pre> + */ + public R value(GroupType value, R valueResult, R typedResult) { + return null; + } + + /** + * Handles a shredded object value result and a list of field value results. + * + * <p>Each field's value and typed_value results are combined by calling {@link #value}. + * + * <pre> + * group variant_value_pair { <-- value result and typed_value field results + * optional binary value; + * optional group typed_value { + * required group a { + * optional binary value; + * optional binary typed_value (UTF8); + * } + * ... + * } + * } + * </pre> + */ + public R object(GroupType object, R valueResult, List<R> fieldResults) { + return null; + } + + /** + * Handles a shredded array value result and an element value result. + * + * <p>The element's value and typed_value results are combined by calling {@link #value}. + * + * <pre> + * group variant_value_pair { <-- value result and element result + * optional binary value; + * optional group typed_value (LIST) { + * repeated group list { + * required group element { + * optional binary value; + * optional binary typed_value (UTF8); + * } + * } + * } + * } + * </pre> + */ + public R array(GroupType array, R valueResult, R elementResult) { + return null; + } + + /** Handler called before visiting any primitive or group type. */ + public void beforeField(Type type) {} + + /** Handler called after visiting any primitive or group type. */ + public void afterField(Type type) {} + + public static <R> R visit(GroupType type, ParquetVariantVisitor<R> visitor) { + Preconditions.checkArgument( + ParquetSchemaUtil.hasField(type, METADATA), "Invalid variant, missing metadata: %s", type); + + Type metadataType = type.getType(METADATA); + Preconditions.checkArgument( + isBinary(metadataType), "Invalid variant metadata, expecting BINARY: %s", metadataType); + + R metadataResult = + withBeforeAndAfter( + () -> visitor.metadata(metadataType.asPrimitiveType()), metadataType, visitor); + R valueResult = visitValue(type, visitor); + + return visitor.variant(type, metadataResult, valueResult); + } + + public static <R> R visitValue(GroupType valueGroup, ParquetVariantVisitor<R> visitor) { Review Comment: This can be private scope for now. ########## parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java: ########## @@ -0,0 +1,1107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.VariantType; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantPrimitive; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.FieldSource; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestVariantReaders { + private static final Schema SCHEMA = + new Schema( + NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "var", VariantType.get())); + + private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType(); + + private static final ByteBuffer TEST_METADATA_BUFFER = + VariantTestUtil.createMetadata(ImmutableList.of("a", "b", "c", "d", "e"), true); + private static final ByteBuffer TEST_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.ofNull(), + "d", Variants.of(PhysicalType.STRING, "iceberg"))); + + private static final VariantMetadata EMPTY_METADATA = + Variants.metadata(VariantTestUtil.emptyMetadata()); + private static final VariantMetadata TEST_METADATA = Variants.metadata(TEST_METADATA_BUFFER); + private static final VariantObject TEST_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, TEST_OBJECT_BUFFER); + + private static final VariantPrimitive<?>[] PRIMITIVES = + new VariantPrimitive[] { + Variants.ofNull(), + Variants.of(true), + Variants.of(false), + Variants.of((byte) 34), + Variants.of((byte) -34), + Variants.of((short) 1234), + Variants.of((short) -1234), + Variants.of(12345), + Variants.of(-12345), + Variants.of(9876543210L), + Variants.of(-9876543210L), + Variants.of(10.11F), + Variants.of(-10.11F), + Variants.of(14.3D), + Variants.of(-14.3D), + Variants.ofIsoDate("2024-11-07"), + Variants.ofIsoDate("1957-11-07"), + Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00"), + Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00"), + Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456"), + Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456"), + Variants.of(new BigDecimal("123456.7890")), // decimal4 + Variants.of(new BigDecimal("-123456.7890")), // decimal4 + Variants.of(new BigDecimal("1234567890.987654321")), // decimal8 + Variants.of(new BigDecimal("-1234567890.987654321")), // decimal8 + Variants.of(new BigDecimal("9876543210.123456789")), // decimal16 + Variants.of(new BigDecimal("-9876543210.123456789")), // decimal16 + Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})), + Variants.of("iceberg"), + }; + + private static Stream<Arguments> metadataAndValues() { + Stream<Arguments> primitives = + Stream.of(PRIMITIVES).map(variant -> Arguments.of(EMPTY_METADATA, variant)); + Stream<Arguments> object = Stream.of(Arguments.of(TEST_METADATA, TEST_OBJECT)); + return Streams.concat(primitives, object); + } + + @ParameterizedTest + @MethodSource("metadataAndValues") + public void testUnshreddedVariants(VariantMetadata metadata, VariantValue expected) + throws IOException { + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", serialize(metadata), "value", serialize(expected))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(metadata, actualVariant.metadata()); + VariantTestUtil.assertEqual(expected, actualVariant.value()); + } + + @ParameterizedTest + @MethodSource("metadataAndValues") + public void testUnshreddedVariantsWithShreddedSchema( + VariantMetadata metadata, VariantValue expected) throws IOException { + // the variant's Parquet schema has a shredded field that is unused by all data values + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", serialize(metadata), "value", serialize(expected))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(metadata, actualVariant.metadata()); + VariantTestUtil.assertEqual(expected, actualVariant.value()); + } + + @ParameterizedTest + @FieldSource("PRIMITIVES") + public void testShreddedVariantPrimitives(VariantPrimitive<?> primitive) throws IOException { + Assumptions.assumeThat(primitive.type() != PhysicalType.NULL) + .as("Null is not a shredded type") + .isTrue(); + + GroupType variantType = variant("var", 2, shreddedType(primitive)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record( + variantType, + Map.of( + "metadata", + VariantTestUtil.emptyMetadata(), + "typed_value", + toAvroValue(primitive))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(primitive, actualVariant.value()); + } + + @Test + public void testNullValueAndNullTypedValue() throws IOException { + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", VariantTestUtil.emptyMetadata())); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value()); + } + + @Test + public void testMissingValueColumn() throws IOException { + GroupType variantType = + Types.buildGroup(Type.Repetition.REQUIRED) + .id(2) + .required(PrimitiveTypeName.BINARY) + .named("metadata") + .addField(shreddedPrimitive(PrimitiveTypeName.INT32)) + .named("var"); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", 34)); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.of(34), actualVariant.value()); + } + + @Test + public void testValueAndTypedValueConflict() throws IOException { + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record( + variantType, + Map.of( + "metadata", + VariantTestUtil.emptyMetadata(), + "value", + serialize(Variants.of("str")), + "typed_value", + 34)); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + assertThatThrownBy(() -> writeAndRead(parquetSchema, record)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid variant, conflicting value and typed_value"); + } + + @Test + public void testUnsignedInteger() { + GroupType variantType = + variant( + "var", + 2, + shreddedPrimitive(PrimitiveTypeName.INT32, LogicalTypeAnnotation.intType(32, false))); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", VariantTestUtil.emptyMetadata())); Review Comment: Addd typed_value `record(variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", 34));` to have the shredded value? ########## parquet/src/main/java/org/apache/iceberg/parquet/TypeWithSchemaVisitor.java: ########## @@ -201,6 +211,17 @@ private static <T> List<T> visitFields( return results; } + private static <T> T visitVariant( + Types.VariantType variant, GroupType group, TypeWithSchemaVisitor<T> visitor) { + ParquetVariantVisitor<T> variantVisitor = visitor.variantVisitor(); + if (variantVisitor != null) { + T variantResult = ParquetVariantVisitor.visit(group, variantVisitor); Review Comment: Rather than creating a static visit() function, can we use `variantVisitor.visit(group)`? With that, we have many functions in ParquetVariantVisitor.java to be class instance method rather than static method. Seems that will make the code there simpler. ########## parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java: ########## @@ -0,0 +1,1107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.VariantType; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantObject; +import org.apache.iceberg.variants.VariantPrimitive; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.FieldSource; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestVariantReaders { + private static final Schema SCHEMA = + new Schema( + NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "var", VariantType.get())); + + private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType(); + + private static final ByteBuffer TEST_METADATA_BUFFER = + VariantTestUtil.createMetadata(ImmutableList.of("a", "b", "c", "d", "e"), true); + private static final ByteBuffer TEST_OBJECT_BUFFER = + VariantTestUtil.createObject( + TEST_METADATA_BUFFER, + ImmutableMap.of( + "a", Variants.ofNull(), + "d", Variants.of(PhysicalType.STRING, "iceberg"))); + + private static final VariantMetadata EMPTY_METADATA = + Variants.metadata(VariantTestUtil.emptyMetadata()); + private static final VariantMetadata TEST_METADATA = Variants.metadata(TEST_METADATA_BUFFER); + private static final VariantObject TEST_OBJECT = + (VariantObject) Variants.value(TEST_METADATA, TEST_OBJECT_BUFFER); + + private static final VariantPrimitive<?>[] PRIMITIVES = + new VariantPrimitive[] { + Variants.ofNull(), + Variants.of(true), + Variants.of(false), + Variants.of((byte) 34), + Variants.of((byte) -34), + Variants.of((short) 1234), + Variants.of((short) -1234), + Variants.of(12345), + Variants.of(-12345), + Variants.of(9876543210L), + Variants.of(-9876543210L), + Variants.of(10.11F), + Variants.of(-10.11F), + Variants.of(14.3D), + Variants.of(-14.3D), + Variants.ofIsoDate("2024-11-07"), + Variants.ofIsoDate("1957-11-07"), + Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00"), + Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00"), + Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456"), + Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456"), + Variants.of(new BigDecimal("123456.7890")), // decimal4 + Variants.of(new BigDecimal("-123456.7890")), // decimal4 + Variants.of(new BigDecimal("1234567890.987654321")), // decimal8 + Variants.of(new BigDecimal("-1234567890.987654321")), // decimal8 + Variants.of(new BigDecimal("9876543210.123456789")), // decimal16 + Variants.of(new BigDecimal("-9876543210.123456789")), // decimal16 + Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})), + Variants.of("iceberg"), + }; + + private static Stream<Arguments> metadataAndValues() { + Stream<Arguments> primitives = + Stream.of(PRIMITIVES).map(variant -> Arguments.of(EMPTY_METADATA, variant)); + Stream<Arguments> object = Stream.of(Arguments.of(TEST_METADATA, TEST_OBJECT)); + return Streams.concat(primitives, object); + } + + @ParameterizedTest + @MethodSource("metadataAndValues") + public void testUnshreddedVariants(VariantMetadata metadata, VariantValue expected) + throws IOException { + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", serialize(metadata), "value", serialize(expected))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(metadata, actualVariant.metadata()); + VariantTestUtil.assertEqual(expected, actualVariant.value()); + } + + @ParameterizedTest + @MethodSource("metadataAndValues") + public void testUnshreddedVariantsWithShreddedSchema( + VariantMetadata metadata, VariantValue expected) throws IOException { + // the variant's Parquet schema has a shredded field that is unused by all data values + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", serialize(metadata), "value", serialize(expected))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(metadata, actualVariant.metadata()); + VariantTestUtil.assertEqual(expected, actualVariant.value()); + } + + @ParameterizedTest + @FieldSource("PRIMITIVES") + public void testShreddedVariantPrimitives(VariantPrimitive<?> primitive) throws IOException { + Assumptions.assumeThat(primitive.type() != PhysicalType.NULL) + .as("Null is not a shredded type") + .isTrue(); + + GroupType variantType = variant("var", 2, shreddedType(primitive)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record( + variantType, + Map.of( + "metadata", + VariantTestUtil.emptyMetadata(), + "typed_value", + toAvroValue(primitive))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(primitive, actualVariant.value()); + } + + @Test + public void testNullValueAndNullTypedValue() throws IOException { + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", VariantTestUtil.emptyMetadata())); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value()); + } + + @Test + public void testMissingValueColumn() throws IOException { + GroupType variantType = + Types.buildGroup(Type.Repetition.REQUIRED) + .id(2) + .required(PrimitiveTypeName.BINARY) + .named("metadata") + .addField(shreddedPrimitive(PrimitiveTypeName.INT32)) + .named("var"); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord variant = + record(variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", 34)); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.of(34), actualVariant.value()); + } + + @Test + public void testValueAndTypedValueConflict() throws IOException { Review Comment: IOException is not thrown here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org