stevenzwu commented on code in PR #14259:
URL: https://github.com/apache/iceberg/pull/14259#discussion_r2511476214


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java:
##########
@@ -147,6 +149,18 @@ public static <T> T visit(
         } finally {
           visitor.fieldNames.pop();
         }
+      } else if 
(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION).equals(annotation)
+          || sType instanceof VariantType) {

Review Comment:
   Flink can probably have a different/cleaner logic here, as Flink doesn't 
produce variant type without the Parquet annotation. Spark may produce variant 
type without the Parquet annotation, as the Spark integration was added before 
Parquet logical type release.
   
   Hence I am thinking maybe sth like this 
   ```
     } else if 
(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION).equals(annotation))
 {
       ...
     }
   ```
   
   @Fokko @aihuaxu what do you think?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java:
##########
@@ -605,4 +627,50 @@ protected Object get(RowData struct, int index) {
       return fieldGetter[index].getFieldOrNull(struct);
     }
   }
+
+  /** Variant writer converts from Flink Variant to Iceberg Variant */
+  public static class VariantWriter implements ParquetValueWriter<Variant> {
+    private final ParquetValueWriter<org.apache.iceberg.variants.Variant> 
writer;
+
+    private VariantWriter(ParquetValueWriter<?> writer) {
+      this.writer = (ParquetValueWriter<org.apache.iceberg.variants.Variant>) 
writer;
+    }
+
+    @Override
+    public void write(int repetitionLevel, Variant variant) {
+      // Flink's Variant implementation uses BinaryVariant as the standard
+      // Based on FLIP-521
+      if (!(variant instanceof BinaryVariant)) {

Review Comment:
   nit: use `Preconditions.checkArguement`



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java:
##########
@@ -378,11 +378,55 @@ private static void assertEquals(
             .isInstanceOf(byte[].class)
             .isEqualTo(expected);
         break;
+      case VARIANT:
+        assertThat(actual)
+            .as("Should expect a Variant")
+            .isInstanceOf(org.apache.flink.types.variant.Variant.class);
+        assertThat(expected)
+            .as("Should expect a Variant")
+            .isInstanceOf(org.apache.iceberg.variants.Variant.class);
+
+        org.apache.flink.types.variant.Variant flinkVariant =
+            (org.apache.flink.types.variant.Variant) actual;
+        org.apache.iceberg.variants.Variant icebergVariant =
+            (org.apache.iceberg.variants.Variant) expected;
+
+        compareVariants(icebergVariant, flinkVariant);
+        break;
       default:
         throw new IllegalArgumentException("Not a supported type: " + type);
     }
   }
 
+  /**
+   * Compare two variants by their underlying data structures rather than 
string representations.
+   * This method handles the differences between Iceberg and Flink variant 
implementations.
+   */
+  private static void compareVariants(
+      org.apache.iceberg.variants.Variant icebergVariant,
+      org.apache.flink.types.variant.Variant flinkVariant) {
+
+    // Extract the underlying data from both variants
+    org.apache.iceberg.variants.VariantValue icebergValue = 
icebergVariant.value();
+    org.apache.iceberg.variants.VariantMetadata icebergMetadata = 
icebergVariant.metadata();
+
+    // For Flink variant, we expect BinaryVariant which is the standard 
implementation
+    assertThat(flinkVariant)
+        .as("Flink variant should be a BinaryVariant")
+        .isInstanceOf(org.apache.flink.types.variant.BinaryVariant.class);
+
+    org.apache.flink.types.variant.BinaryVariant binaryVariant =
+        (org.apache.flink.types.variant.BinaryVariant) flinkVariant;
+
+    // Compare the binary data directly
+    byte[] flinkValueBytes = binaryVariant.getValue();
+    byte[] flinkMetadataBytes = binaryVariant.getMetadata();
+
+    // Verify that both variants contain data
+    assertThat(flinkValueBytes).as("Flink variant should contain value 
bytes").isNotNull();
+    assertThat(flinkMetadataBytes).as("Flink variant should contain metadata 
bytes").isNotNull();
+  }

Review Comment:
   it seems that the comparison is not completed. there is no actual comparison 
of the byte[]



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkVariants.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.variants.ShreddedObject;
+import org.apache.iceberg.variants.ValueArray;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantPrimitive;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.VariantValue;
+import org.apache.iceberg.variants.Variants;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public class TestFlinkVariants {

Review Comment:
   can we extract a base test class to share the common code btw Flink and 
Spark variants test?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java:
##########
@@ -605,4 +627,50 @@ protected Object get(RowData struct, int index) {
       return fieldGetter[index].getFieldOrNull(struct);
     }
   }
+
+  /** Variant writer converts from Flink Variant to Iceberg Variant */
+  public static class VariantWriter implements ParquetValueWriter<Variant> {
+    private final ParquetValueWriter<org.apache.iceberg.variants.Variant> 
writer;
+
+    private VariantWriter(ParquetValueWriter<?> writer) {
+      this.writer = (ParquetValueWriter<org.apache.iceberg.variants.Variant>) 
writer;
+    }
+
+    @Override
+    public void write(int repetitionLevel, Variant variant) {
+      // Flink's Variant implementation uses BinaryVariant as the standard
+      // Based on FLIP-521
+      if (!(variant instanceof BinaryVariant)) {
+        throw new IllegalArgumentException(
+            "Expected BinaryVariant but got: " + 
variant.getClass().getSimpleName());
+      }
+
+      BinaryVariant binaryVariant = (BinaryVariant) variant;
+
+      byte[] metadataBytes = binaryVariant.getMetadata();

Review Comment:
   nit: no need for these two local variables



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java:
##########
@@ -297,6 +301,21 @@ public Optional<ParquetValueReader<?>> visit(
       }
     }
 
+    @Override
+    public ParquetVariantVisitor<ParquetValueReader<?>> variantVisitor() {
+      return new VariantReaderBuilder(type, Arrays.asList(currentPath()));
+    }
+
+    @Override
+    public ParquetValueReader<?> variant(
+        Types.VariantType expected, GroupType group, ParquetValueReader<?> 
result) {
+      if (expected == null) {

Review Comment:
   I didn't see similar null check in other places. it this required?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java:
##########
@@ -83,6 +84,11 @@ public LogicalType map(Types.MapType map, LogicalType 
keyResult, LogicalType val
     return new MapType(keyResult.copy(false), 
valueResult.copy(map.isValueOptional()));
   }
 
+  @Override
+  public LogicalType variant(Types.VariantType variant) {
+    return new VariantType(false); // VariantType is always non-nullable in 
Flink

Review Comment:
   > VariantType is always non-nullable in Flink
   
   Is this correct? I saw plenty of places in Flink where nullable variant type 
is used.
   
   
https://github.com/search?q=repo%3Aapache%2Fflink%20%22new%20VariantType()%22&type=code



##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkVariants.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.file.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.inmemory.InMemoryOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.variants.ShreddedObject;
+import org.apache.iceberg.variants.ValueArray;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantPrimitive;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.VariantValue;
+import org.apache.iceberg.variants.Variants;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public class TestFlinkVariants {
+
+  @TempDir private Path temp;
+
+  private static final VariantPrimitive<?>[] PRIMITIVES =
+      new VariantPrimitive[] {
+        Variants.ofNull(),
+        Variants.of(true),
+        Variants.of(false),
+        Variants.of((byte) 34),
+        Variants.of((byte) -34),
+        Variants.of((short) 1234),
+        Variants.of((short) -1234),
+        Variants.of(12345),
+        Variants.of(-12345),
+        Variants.of(9876543210L),
+        Variants.of(-9876543210L),
+        Variants.of(10.11F),
+        Variants.of(-10.11F),
+        Variants.of(14.3D),
+        Variants.of(-14.3D),
+        Variants.ofIsoDate("2024-11-07"),
+        Variants.ofIsoDate("1957-11-07"),
+        Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00"),
+        Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00"),
+        Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456"),
+        Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456"),
+        Variants.of(new BigDecimal("12345.6789")), // decimal4
+        Variants.of(new BigDecimal("-12345.6789")), // decimal4
+        Variants.of(new BigDecimal("123456789.987654321")), // decimal8
+        Variants.of(new BigDecimal("-123456789.987654321")), // decimal8
+        Variants.of(new BigDecimal("9876543210.123456789")), // decimal16
+        Variants.of(new BigDecimal("-9876543210.123456789")), // decimal16
+        Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})),
+        Variants.of("iceberg"),
+        Variants.ofUUID("f24f9b64-81fa-49d1-b74e-8c09a6e31c56"),
+      };
+
+  private static final VariantPrimitive<?>[] UNSUPPORTED_PRIMITIVES =
+      new VariantPrimitive[] {
+        Variants.ofIsoTime("12:33:54.123456"),
+        Variants.ofIsoTimestamptzNanos("2024-11-07T12:33:54.123456789+00:00"),
+        Variants.ofIsoTimestampntzNanos("2024-11-07T12:33:54.123456789"),
+      };
+
+  @Test
+  public void testIcebergVariantTypeToFlinkVariantType() {
+    // Test that Iceberg's VariantType converts to Flink's VariantType
+    Types.VariantType icebergVariantType = Types.VariantType.get();
+    LogicalType flinkVariantType = FlinkSchemaUtil.convert(icebergVariantType);
+
+    assertThat(flinkVariantType)
+        .isInstanceOf(org.apache.flink.table.types.logical.VariantType.class);
+  }
+
+  @Test
+  public void testFlinkVariantTypeToIcebergVariantType() {
+    org.apache.flink.table.types.logical.VariantType flinkVariantType =
+        new org.apache.flink.table.types.logical.VariantType(false);
+    Types.VariantType icebergVariantType =
+        (Types.VariantType) FlinkSchemaUtil.convert(flinkVariantType);
+
+    assertThat(icebergVariantType).isEqualTo(Types.VariantType.get());

Review Comment:
   the line above already did type cast. this assertion is not useful. maybe 
the line above should just use the base `Type`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to