Guosmilesmile opened a new pull request, #11662: URL: https://github.com/apache/iceberg/pull/11662
When configuring the distribution mode to RANGE, if the partition field in the data contains null values, it will cause the SortKey serialization to fail, resulting in the job continuously restarting. If we set the partition as `bucket(5, name)`, and some data has a null value for `name`, then the `DataStatisticsOperator` will throw an error when serializing the statistics and sending them to the coordinator. In SortKeySerializer.serialize ``` public void serialize(SortKey record, DataOutputView target) throws IOException { for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); switch (typeId) { case BOOLEAN: target.writeBoolean(record.get(i, Boolean.class)); break; case INTEGER: case DATE: target.writeInt(record.get(i, Integer.class)); break; ... } ``` Error log: ``` Caused by: org.apache.flink.util.SerializedThrowable: java.lang.NullPointerException at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:133) ~[flink-onejar-4.0.0.jar:?] at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:49) ~[flink-onejar-4.0.0.jar:?] at org.apache.flink.api.common.typeutils.base.MapSerializer.serialize(MapSerializer.java:136) ~[flink-dist-1.19.1.jar:1.19.1] at org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:113) ~[flink-onejar-4.0.0.jar:?] at org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:38) ~[flink-onejar-4.0.0.jar:?] at org.apache.iceberg.flink.sink.shuffle.StatisticsUtil.serializeDataStatistics(StatisticsUtil.java:46) ~[flink-onejar-4.0.0.jar:?] at org.apache.iceberg.flink.sink.shuffle.StatisticsEvent.createTaskStatisticsEvent(StatisticsEvent.java:51) ~[flink-onejar-4.0.0.jar:?] at org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperator.snapshotState(DataStatisticsOperator.java:227) ~[flink-onejar-4.0.0.jar:?] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:234) ~[flink-dist-1.19.1.jar:1.19.1] ``` We think If a field is set as a partition and a transformation function is also applied, the scenario will inevitably occur when the field contains null values. I believe that during the statistics, we can ignore the data with null values in the relevant partition field. If the sample size is small, it should not affect the job's execution. If the sample size is large, using the default round strategy would also be appropriate. This PR mainly traverses the contents of the partition field and skips the statistics collection for any parts that contain null values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org