Leven2023 opened a new issue, #11611: URL: https://github.com/apache/iceberg/issues/11611
### Apache Iceberg version 1.5.2 ### Query engine Other ### Please describe the bug 🐞 ### Steps to reproduce the bug: > 1)Create a new table, specify the storage format as parquet, and the table structure as Schema { Struct{ int, String }, int ,int, int }; > 2)Use org.apache.iceberg.io.TaskWriter to write 10,000 rows of data first; then use Struct{int ,String} as the primary key to delete the 10,000 rows of data just written (Here should be equalDelete method); > 3)Use org.apache.iceberg.spark.actions.SparkActions#rewriteDataFiles to organize the files. > 4)Read the data again, we expecting to read 0 rows of data, but found that 9999 rows of data were read。 ### Reason: > 1)In iceberg1.5.2,SparkActions rewrite data files by reading and rewriting the data into parquet based on the dataFile and deleteFile of the current snapshot. This process involves equalDelete behavior. > 2)Spark equalDelete process needs to read all delete information through DeleteFilter. In iceberg1.5.2, DeleteFilter reads the data of List<DeleteFile> through BaseDeleteLoader, and stores it in a Set<> after materializing it into memory for subsequent use. In BaseDeleteLoader reads the DeleteFile process, it uses org.apache.iceberg.data.parquet.GenericParquetReaders.RecordReader to read, and the RecordReader reading process is a reused GenericRecord template. Therefore, before materializing it into memory, a copy will be made. And Record:copy. ``` public class BaseDeleteLoader implements DeleteLoader { ... protected Iterable<StructLike> readEqDeletes(DeleteFile deleteFile, Schema projection) { CloseableIterable<Record> deletes = openDeletes(deleteFile, projection); CloseableIterable<Record> copiedDeletes = CloseableIterable.transform(deletes, Record::copy); CloseableIterable<StructLike> copiedDeletesAsStructs = toStructs(copiedDeletes, projection); return materialize(copiedDeletesAsStructs); } ... // materializes the iterable and releases resources so that the result can be cached private <T> Iterable<T> materialize(CloseableIterable<T> iterable) { try (CloseableIterable<T> closeableIterable = iterable) { return ImmutableList.copyOf(closeableIterable); } catch (IOException e) { throw new UncheckedIOException("Failed to close iterable", e); } } ... } ``` ``` public class GenericParquetReaders extends BaseParquetReaders<Record> { ... private static class RecordReader extends ParquetValueReaders.StructReader<Record, Record> { private final GenericRecord template; RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, Types.StructType struct) { super(types, readers); this.template = struct != null ? GenericRecord.create(struct) : null; } protected Record newStructData(Record reuse) { return (Record)(reuse != null ? reuse : this.template.copy()); } protected Object getField(Record intermediate, int pos) { return intermediate.get(pos); } protected Record buildStruct(Record struct) { return struct; } protected void set(Record struct, int pos, Object value) { struct.set(pos, value); } } } ``` >3) The copy implementation process of GenericRecord is a shallow copy of the internal Object[] value. ``` public class GenericRecord implements Record, StructLike { ... public static GenericRecord create(Schema schema) { return new GenericRecord(schema.asStruct()); } public static GenericRecord create(StructType struct) { return new GenericRecord(struct); } private final StructType struct; private final int size; private final Object[] values; private final Map<String, Integer> nameToPos; private GenericRecord(StructType struct) { this.struct = struct; this.size = struct.fields().size(); this.values = new Object[size]; this.nameToPos = NAME_MAP_CACHE.get(struct); } private GenericRecord(GenericRecord toCopy) { this.struct = toCopy.struct; this.size = toCopy.size; this.values = Arrays.copyOf(toCopy.values, toCopy.values.length); this.nameToPos = toCopy.nameToPos; } ... @Override public GenericRecord copy() { return new GenericRecord(this); } ... } ``` **> 4)After analyzing this, you may find that since the copy process of GenericRecord is a shallow copy, when the shallow copy of GenericRecord that reads the data of DeleteFile is put into List for equal value deletion, the internal values of all GenericRecord elements in List are just a reference. As the reading of DeleteFile continues, RecordReader will continuously load new data into the GenericRecord template. The value references in all GenericRecord elements in the List will also be constantly modified, that is, after 10,000 lines of Delete information are put into the List, these 10,000 lines of delete information are actually duplicated (equivalent to the last line of record read), so after spark completes the equal value deletion, the data read is 10000-1=9999.** - 5)Since DeleteLoader is also used in the equalDelete process of reading parquet table, this data error bug will also appear in the reading process after client use the Struct type as the primary key in equalDelete operation. ### Solution: **From a personal perspective, changing the copy of GenericRecord from shallow copy to deep copy can solve this bug. This requires adjusting the code of org.apache.iceberg.data.GenericRecord#GenericRecord(GenericRecord toCopy). Besides, I have verified the feasibility and correctness of this solution locally.** > Negative impact: > 1)When the schema contains only basic field types, the deep copy process may have a higher cost than the previous shallow copy. The extent of this influence has not yet been fully demonstrated. ### Other notes: If the bug is caused by the insufficiency of my demo case, or if the problem has been fixed in the new iceberg version, I hope readers or community workers can let me know. Thank you very much. ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [X] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org