stevenzwu commented on code in PR #12049:
URL: https://github.com/apache/iceberg/pull/12049#discussion_r1932969801


##########
data/src/test/java/org/apache/iceberg/data/DataTest.java:
##########
@@ -486,4 +490,20 @@ public void 
testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau
 
     writeAndValidate(writeSchema, readSchema);
   }
+
+  @Test
+  public void testWriteNullValueForRequiredType() {
+    Schema schema =
+        new Schema(
+            required(0, "id", LongType.get()), required(1, "string", 
Types.StringType.get()));
+
+    GenericRecord genericRecord = GenericRecord.create(schema);
+    genericRecord.set(0, 42L);
+    genericRecord.set(1, null);
+
+    Assertions.assertThrows(
+        // The actual exception depends on the implementation, e.g.
+        // NullPointerException or IllegalArgumentException.
+        Exception.class, () -> writeAndValidate(schema, 
List.of(genericRecord)));

Review Comment:
   nit: I didn't find any use of `List.of` in Iceberg code base. Usually 
`Lists.newArrayList` or `ImmutableList.of` were used. maybe you can check if 
the check style flag this or not.



##########
data/src/test/java/org/apache/iceberg/data/DataTest.java:
##########
@@ -486,4 +490,20 @@ public void 
testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau
 
     writeAndValidate(writeSchema, readSchema);
   }
+
+  @Test
+  public void testWriteNullValueForRequiredType() {
+    Schema schema =
+        new Schema(
+            required(0, "id", LongType.get()), required(1, "string", 
Types.StringType.get()));
+
+    GenericRecord genericRecord = GenericRecord.create(schema);
+    genericRecord.set(0, 42L);
+    genericRecord.set(1, null);
+
+    Assertions.assertThrows(

Review Comment:
   style is to use assertj



##########
data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java:
##########
@@ -35,14 +35,26 @@
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 public class TestGenericData extends DataTest {
+
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     writeAndValidate(schema, schema);
   }
 
+  @Override
+  protected void writeAndValidate(Schema writeSchema, List<Record> 
expectedData)
+      throws IOException {
+    writeAndValidate(writeSchema, writeSchema, expectedData);
+  }
+
   @Override
   protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
-    List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
+    List<Record> data = RandomGenericData.generate(writeSchema, 100, 0L);
+    writeAndValidate(writeSchema, expectedSchema, data);
+  }
+
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, 
List<Record> expected)

Review Comment:
   nit: protected -> private?



##########
data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java:
##########
@@ -46,14 +46,25 @@
 import org.junit.jupiter.api.Test;
 
 public class TestGenericData extends DataTest {
+
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     writeAndValidate(schema, schema);
   }
 
+  @Override
+  protected void writeAndValidate(Schema schema, List<Record> expected) throws 
IOException {
+    writeAndValidate(schema, schema, expected);
+  }
+
   @Override
   protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) 
throws IOException {
-    List<Record> expected = RandomGenericData.generate(writeSchema, 100, 
12228L);
+    List<Record> data = RandomGenericData.generate(writeSchema, 100, 12228L);
+    writeAndValidate(writeSchema, expectedSchema, data);
+  }
+
+  protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, 
List<Record> expected)

Review Comment:
   nit: protected -> private



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java:
##########
@@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws 
IOException {
       assertThat(records).isExhausted();
     }
   }
+
+  @Override
+  @Disabled("ORC file format supports null values even for required fields")

Review Comment:
   similarly if we should just modify the test code behavior (instead of 
ignore/disable).



##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java:
##########
@@ -498,6 +498,12 @@ private static class RowDataWriter extends 
ParquetValueWriters.StructWriter<RowD
 
     @Override
     protected Object get(RowData struct, int index) {
+      // Be sure to check for null values, even if the field is required. 
Without an explicit null
+      // check, BinaryRowData will ignore the null flag and parse random bytes 
as actual values.
+      // This will produce incorrect writes instead of failing with a 
NullPointerException.
+      if (struct.isNullAt(index)) {
+        return null;
+      }

Review Comment:
   do we need to fix the `FlinkOrcWriters`?



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java:
##########
@@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception {
     assertThat(secondTransformation.getName()).isEqualTo("data-ingestion");
   }
 
+  @TestTemplate
+  void testErrorOnNullForRequiredField() throws Exception {
+    Assumptions.assumeFalse(
+        format == FileFormat.ORC, "ORC file format supports null values even 
for required fields.");
+
+    Schema icebergSchema =
+        new Schema(
+            Types.NestedField.required(1, "id2", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data2", Types.StringType.get()));
+    Table table2 =
+        CATALOG_EXTENSION
+            .catalog()
+            .createTable(
+                TableIdentifier.of(DATABASE, "t2"),
+                icebergSchema,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+
+    // Null out a required field
+    List<Row> rows = List.of(Row.of(42, null));
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<Row> dataStream =
+        env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO).uid("mySourceId");
+
+    TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+    IcebergSink.forRow(dataStream, flinkSchema)
+        .table(table2)
+        .tableLoader(tableLoader)
+        .tableSchema(flinkSchema)
+        .writeParallelism(parallelism)
+        .distributionMode(DistributionMode.HASH)
+        .append();
+
+    try {

Review Comment:
   style: search `assertThatThrownBy` in the code base.
   
   If possible, it will be great to include error msg in the 
`assertThatThrownBy` check 



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java:
##########
@@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception {
     assertThat(secondTransformation.getName()).isEqualTo("data-ingestion");
   }
 
+  @TestTemplate
+  void testErrorOnNullForRequiredField() throws Exception {
+    Assumptions.assumeFalse(
+        format == FileFormat.ORC, "ORC file format supports null values even 
for required fields.");
+
+    Schema icebergSchema =
+        new Schema(
+            Types.NestedField.required(1, "id2", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data2", Types.StringType.get()));
+    Table table2 =
+        CATALOG_EXTENSION
+            .catalog()
+            .createTable(
+                TableIdentifier.of(DATABASE, "t2"),
+                icebergSchema,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+
+    // Null out a required field
+    List<Row> rows = List.of(Row.of(42, null));
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<Row> dataStream =
+        env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO).uid("mySourceId");
+
+    TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+    IcebergSink.forRow(dataStream, flinkSchema)
+        .table(table2)
+        .tableLoader(tableLoader)
+        .tableSchema(flinkSchema)
+        .writeParallelism(parallelism)
+        .distributionMode(DistributionMode.HASH)

Review Comment:
   nit: for this test, we can remove this config and use `none` distribution.



##########
data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java:
##########
@@ -237,4 +243,10 @@ private void writeAndValidateRecords(Schema schema, 
List<Record> expected) throw
       DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), 
rows.get(i));
     }
   }
+
+  @Override
+  @Ignore("ORC file format supports null values even for required fields")

Review Comment:
   then we shouldn't ignore this test. maybe modify the test behavior instead?



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java:
##########
@@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws 
IOException {
                 schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
         schema);
   }
+
+  @Override
+  protected void writeAndValidate(Schema schema, List<Record> expectedData) 
throws IOException {
+    RowDataSerializer rowDataSerializer = new 
RowDataSerializer(FlinkSchemaUtil.convert(schema));
+    List<RowData> binaryRowList = Lists.newArrayList();
+    for (Record record : expectedData) {
+      RowData rowData = RowDataConverter.convert(schema, record);
+      BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
+      binaryRowList.add(binaryRow);
+    }
+    writeAndValidate(binaryRowList, schema);

Review Comment:
   nit: Iceberg style is to add an empty line after a control block (if, for, 
while etc.)



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java:
##########
@@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception {
     assertThat(secondTransformation.getName()).isEqualTo("data-ingestion");
   }
 
+  @TestTemplate
+  void testErrorOnNullForRequiredField() throws Exception {
+    Assumptions.assumeFalse(
+        format == FileFormat.ORC, "ORC file format supports null values even 
for required fields.");
+
+    Schema icebergSchema =
+        new Schema(
+            Types.NestedField.required(1, "id2", Types.IntegerType.get()),
+            Types.NestedField.required(2, "data2", Types.StringType.get()));
+    Table table2 =
+        CATALOG_EXTENSION
+            .catalog()
+            .createTable(
+                TableIdentifier.of(DATABASE, "t2"),
+                icebergSchema,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+
+    // Null out a required field
+    List<Row> rows = List.of(Row.of(42, null));
+
+    env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStream<Row> dataStream =
+        env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO).uid("mySourceId");
+
+    TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+    IcebergSink.forRow(dataStream, flinkSchema)
+        .table(table2)
+        .tableLoader(tableLoader)

Review Comment:
   technically this is incorrect, as this `TableLoader` loads the default table 
(not `table2`).
   
   you can look for example in this class
   ```
   TableLoader leftTableLoader =
           TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), 
TableIdentifier.of("left"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to