Guosmilesmile opened a new pull request, #11662:
URL: https://github.com/apache/iceberg/pull/11662

   When configuring the distribution mode to RANGE, if the partition field in 
the data contains null values, it will cause the SortKey serialization to fail, 
resulting in the job continuously restarting.
   
   If we set the partition as `bucket(5, name)`, and some data has a null value 
for `name`, then the `DataStatisticsOperator` will throw an error when 
serializing the statistics and sending them to the coordinator.
   
   In SortKeySerializer.serialize
   ```
     public void serialize(SortKey record, DataOutputView target) throws 
IOException {
   
       for (int i = 0; i < size; ++i) {
         int fieldId = transformedFields[i].fieldId();
         Type.TypeID typeId = transformedFields[i].type().typeId();
         switch (typeId) {
           case BOOLEAN:
             target.writeBoolean(record.get(i, Boolean.class));
             break;
           case INTEGER:
           case DATE:
             target.writeInt(record.get(i, Integer.class));
             break;
           
             ...
          }
   
   
   ```
   
   Error log:
   
   ```
   Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.NullPointerException
       at 
org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:133)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:49)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.flink.api.common.typeutils.base.MapSerializer.serialize(MapSerializer.java:136)
 ~[flink-dist-1.19.1.jar:1.19.1]
       at 
org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:113)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:38)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.iceberg.flink.sink.shuffle.StatisticsUtil.serializeDataStatistics(StatisticsUtil.java:46)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.iceberg.flink.sink.shuffle.StatisticsEvent.createTaskStatisticsEvent(StatisticsEvent.java:51)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperator.snapshotState(DataStatisticsOperator.java:227)
 ~[flink-onejar-4.0.0.jar:?]
       at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:234)
 ~[flink-dist-1.19.1.jar:1.19.1]
   ```
   
   We think If a field is set as a partition and a transformation function is 
also applied, the scenario will inevitably occur when the field contains null 
values. I believe that during the statistics, we can ignore the data with null 
values in the relevant partition field. If the sample size is small, it should 
not affect the job's execution. If the sample size is large, using the default 
round strategy would also be appropriate.
   
   
   This PR mainly traverses the contents of the partition field and skips the 
statistics collection for any parts that contain null values.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to