stevenzwu commented on code in PR #12049: URL: https://github.com/apache/iceberg/pull/12049#discussion_r1932969801
########## data/src/test/java/org/apache/iceberg/data/DataTest.java: ########## @@ -486,4 +490,20 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau writeAndValidate(writeSchema, readSchema); } + + @Test + public void testWriteNullValueForRequiredType() { + Schema schema = + new Schema( + required(0, "id", LongType.get()), required(1, "string", Types.StringType.get())); + + GenericRecord genericRecord = GenericRecord.create(schema); + genericRecord.set(0, 42L); + genericRecord.set(1, null); + + Assertions.assertThrows( + // The actual exception depends on the implementation, e.g. + // NullPointerException or IllegalArgumentException. + Exception.class, () -> writeAndValidate(schema, List.of(genericRecord))); Review Comment: nit: I didn't find any use of `List.of` in Iceberg code base. Usually `Lists.newArrayList` or `ImmutableList.of` were used. maybe you can check if the check style flag this or not. ########## data/src/test/java/org/apache/iceberg/data/DataTest.java: ########## @@ -486,4 +490,20 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau writeAndValidate(writeSchema, readSchema); } + + @Test + public void testWriteNullValueForRequiredType() { + Schema schema = + new Schema( + required(0, "id", LongType.get()), required(1, "string", Types.StringType.get())); + + GenericRecord genericRecord = GenericRecord.create(schema); + genericRecord.set(0, 42L); + genericRecord.set(1, null); + + Assertions.assertThrows( Review Comment: style is to use assertj ########## data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java: ########## @@ -35,14 +35,26 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class TestGenericData extends DataTest { + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override + protected void writeAndValidate(Schema writeSchema, List<Record> expectedData) + throws IOException { + writeAndValidate(writeSchema, writeSchema, expectedData); + } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { - List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L); + List<Record> data = RandomGenericData.generate(writeSchema, 100, 0L); + writeAndValidate(writeSchema, expectedSchema, data); + } + + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Record> expected) Review Comment: nit: protected -> private? ########## data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java: ########## @@ -46,14 +46,25 @@ import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override + protected void writeAndValidate(Schema schema, List<Record> expected) throws IOException { + writeAndValidate(schema, schema, expected); + } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { - List<Record> expected = RandomGenericData.generate(writeSchema, 100, 12228L); + List<Record> data = RandomGenericData.generate(writeSchema, 100, 12228L); + writeAndValidate(writeSchema, expectedSchema, data); + } + + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Record> expected) Review Comment: nit: protected -> private ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java: ########## @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException { assertThat(records).isExhausted(); } } + + @Override + @Disabled("ORC file format supports null values even for required fields") Review Comment: similarly if we should just modify the test code behavior (instead of ignore/disable). ########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java: ########## @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD @Override protected Object get(RowData struct, int index) { + // Be sure to check for null values, even if the field is required. Without an explicit null + // check, BinaryRowData will ignore the null flag and parse random bytes as actual values. + // This will produce incorrect writes instead of failing with a NullPointerException. + if (struct.isNullAt(index)) { + return null; + } Review Comment: do we need to fix the `FlinkOrcWriters`? ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java: ########## @@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assumptions.assumeFalse( + format == FileFormat.ORC, "ORC file format supports null values even for required fields."); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + TableIdentifier.of(DATABASE, "t2"), + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List<Row> rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Row> dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(tableLoader) + .tableSchema(flinkSchema) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.HASH) + .append(); + + try { Review Comment: style: search `assertThatThrownBy` in the code base. If possible, it will be great to include error msg in the `assertThatThrownBy` check ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java: ########## @@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assumptions.assumeFalse( + format == FileFormat.ORC, "ORC file format supports null values even for required fields."); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + TableIdentifier.of(DATABASE, "t2"), + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List<Row> rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Row> dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(tableLoader) + .tableSchema(flinkSchema) + .writeParallelism(parallelism) + .distributionMode(DistributionMode.HASH) Review Comment: nit: for this test, we can remove this config and use `none` distribution. ########## data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java: ########## @@ -237,4 +243,10 @@ private void writeAndValidateRecords(Schema schema, List<Record> expected) throw DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); } } + + @Override + @Ignore("ORC file format supports null values even for required fields") Review Comment: then we shouldn't ignore this test. maybe modify the test behavior instead? ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java: ########## @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List<RowData> binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + writeAndValidate(binaryRowList, schema); Review Comment: nit: Iceberg style is to add an empty line after a control block (if, for, while etc.) ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java: ########## @@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assumptions.assumeFalse( + format == FileFormat.ORC, "ORC file format supports null values even for required fields."); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + TableIdentifier.of(DATABASE, "t2"), + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List<Row> rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Row> dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(tableLoader) Review Comment: technically this is incorrect, as this `TableLoader` loads the default table (not `table2`). you can look for example in this class ``` TableLoader leftTableLoader = TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), TableIdentifier.of("left")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org