Leven2023 opened a new issue, #11611:
URL: https://github.com/apache/iceberg/issues/11611

   ### Apache Iceberg version
   
   1.5.2
   
   ### Query engine
   
   Other
   
   ### Please describe the bug 🐞
   
   ### Steps to reproduce the bug:
   
   > 1)Create a new table, specify the storage format as parquet, and the table 
structure as Schema { Struct{ int, String }, int ,int, int };
   
   > 2)Use org.apache.iceberg.io.TaskWriter to write 10,000 rows of data first; 
then use Struct{int ,String} as the primary key to delete the 10,000 rows of 
data just written (Here should be equalDelete method);
   
   > 3)Use org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles to 
organize the files.
   
   > 4)Read the data again, we expecting to read 0 rows of data, but found that 
9999 rows of data were read。
   
   ### Reason:
   > 1)In iceberg1.5.2,SparkActions rewrite data files by reading and rewriting 
the data into parquet based on the dataFile and deleteFile of the current 
snapshot. This process involves equalDelete behavior.
   
   > 2)Spark equalDelete  process needs to read all delete information through 
DeleteFilter. In iceberg1.5.2, DeleteFilter reads the data of List<DeleteFile> 
through BaseDeleteLoader, and stores it in a Set<> after materializing it into 
memory for subsequent  use.  In BaseDeleteLoader reads the DeleteFile process, 
it uses org.apache.iceberg.data.parquet.GenericParquetReaders.RecordReader to 
read, and the RecordReader reading process is a reused GenericRecord template. 
Therefore, before materializing it into memory, a copy will be made. And 
Record:copy.
   ```
   public class BaseDeleteLoader implements DeleteLoader {
     ...
     protected Iterable<StructLike> readEqDeletes(DeleteFile deleteFile, Schema 
projection) {
         CloseableIterable<Record> deletes = openDeletes(deleteFile, 
projection);
         CloseableIterable<Record> copiedDeletes = 
CloseableIterable.transform(deletes, Record::copy);
         CloseableIterable<StructLike> copiedDeletesAsStructs = 
toStructs(copiedDeletes, projection);
         return materialize(copiedDeletesAsStructs);
       }
      ...
     // materializes the iterable and releases resources so that the result can 
be cached
     private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
       try (CloseableIterable<T> closeableIterable = iterable) {
         return ImmutableList.copyOf(closeableIterable);
       } catch (IOException e) {
         throw new UncheckedIOException("Failed to close iterable", e);
       }
     }
     ...
   }
   ```
   ```
   public class GenericParquetReaders extends BaseParquetReaders<Record> {
      ...
       private static class RecordReader extends 
ParquetValueReaders.StructReader<Record, Record> {
           private final GenericRecord template;
   
           RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, 
Types.StructType struct) {
               super(types, readers);
               this.template = struct != null ? GenericRecord.create(struct) : 
null;
           }
   
           protected Record newStructData(Record reuse) {
               return (Record)(reuse != null ? reuse : this.template.copy());
           }
   
           protected Object getField(Record intermediate, int pos) {
               return intermediate.get(pos);
           }
   
           protected Record buildStruct(Record struct) {
               return struct;
           }
   
           protected void set(Record struct, int pos, Object value) {
               struct.set(pos, value);
           }
       }
   }
   ```
   
   >3) The copy implementation process of GenericRecord is a shallow copy of 
the internal Object[] value.
   ```
   public class GenericRecord implements Record, StructLike {
     ...
     public static GenericRecord create(Schema schema) {
       return new GenericRecord(schema.asStruct());
     }
   
     public static GenericRecord create(StructType struct) {
       return new GenericRecord(struct);
     }
   
     private final StructType struct;
     private final int size;
     private final Object[] values;
     private final Map<String, Integer> nameToPos;
   
     private GenericRecord(StructType struct) {
       this.struct = struct;
       this.size = struct.fields().size();
       this.values = new Object[size];
       this.nameToPos = NAME_MAP_CACHE.get(struct);
     }
   
     private GenericRecord(GenericRecord toCopy) {
       this.struct = toCopy.struct;
       this.size = toCopy.size;
       this.values = Arrays.copyOf(toCopy.values, toCopy.values.length);
       this.nameToPos = toCopy.nameToPos;
     }
     ...
   
     @Override
     public GenericRecord copy() {
       return new GenericRecord(this);
     }
     ...
   }
   ```
   **> 4)After analyzing this, you may find that since the copy process of 
GenericRecord is a shallow copy, when the shallow copy of GenericRecord that 
reads the data of DeleteFile is put into List for equal value deletion, the 
internal values ​​of all GenericRecord elements in List are just a reference. 
As the reading of DeleteFile continues, RecordReader will continuously load new 
data into the GenericRecord template. The value references in all GenericRecord 
elements in the List will also be constantly modified, that is, after 10,000 
lines of Delete information are put into the List, these 10,000 lines of delete 
information are actually duplicated (equivalent to the last line of record 
read), so after spark completes the equal value deletion, the data read is 
10000-1=9999.**
   
   - 5)Since DeleteLoader is also used in the equalDelete process of reading 
parquet table, this data error bug will also appear in the reading process 
after client use the Struct type as the primary key in equalDelete operation.
   
   ### Solution:
   **From a personal perspective, changing the copy of GenericRecord from 
shallow copy to deep copy can solve this bug. This requires adjusting the code 
of org.apache.iceberg.data.GenericRecord#GenericRecord(GenericRecord toCopy).
   Besides, I have verified the feasibility and correctness of this solution 
locally.**
   > Negative impact:
   > 1)When the schema contains only basic field types, the deep copy process 
may have a higher cost than the previous shallow copy. The extent of this 
influence has not yet been fully demonstrated.
   
   ### Other notes:
   If the bug is caused by the insufficiency of my demo case, or if the problem 
has been fixed in the new iceberg version, I hope readers or community workers 
can let me know. Thank you very much.
   
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [X] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to