rahil-c commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3069731776


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,88 @@ package org.apache.hudi
 
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hudi.SparkAdapter
 
+import java.io.IOException
+import java.util.Properties
 import scala.collection.mutable
 
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
   private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
+  // Maps InternalRow instances (by identity) to their original Avro records 
when the Avro record's
+  // schema differs from the BufferedRecord schema. This handles 
ExpressionPayload records whose
+  // getInsertValue result carries the data schema (no meta fields) while the 
BufferedRecord
+  // stores writeSchemaWithMetaFields. Returning the original Avro record from 
convertToAvroRecord
+  // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the 
correct data schema.
+  //
+  // IMPORTANT INVARIANT: The identity link between InternalRow and 
GenericRecord must survive
+  // from extractDataFromRecord (where the cache is populated) to 
convertToAvroRecord (where
+  // it is consumed via remove()). Any operation that replaces the InternalRow 
object in between

Review Comment:
   **[Critical]** The `IdentityHashMap<InternalRow, GenericRecord>` cache 
relies on object identity surviving from `extractDataFromRecord` to 
`convertToAvroRecord`. Any intermediate `InternalRow.copy()`, `seal()`, 
`replaceRecord()`, or `project()` silently breaks the link and falls back to 
schema-based serialization — which produces wrong results for ExpressionPayload 
(wrong schema used for decode).
   
   The extensive comments documenting this invariant are actually evidence of 
how fragile it is. Consider either:
   1. A wrapper type that carries the GenericRecord explicitly (e.g. a 
`TaggedInternalRow`), or
   2. Keying by record key string instead of object identity
   
   At minimum, add a debug assertion or metric when the cache is non-empty 
after a FileGroup read completes, so violations are caught rather than silently 
producing wrong data.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,54 @@ protected BufferedRecord<T> 
handleNonDeletes(BufferedRecord<T> previousRecord, B

Review Comment:
   **[High]** This branch uses `record.getSchema() != recordAvroSchema` as a 
heuristic to detect cached ExpressionPayload records. But schema inequality can 
also occur during legitimate schema evolution (e.g., a reader schema that 
differs from the writer schema). This heuristic conflates two unrelated 
conditions.
   
   Could we instead use an explicit signal (e.g. a boolean flag from the 
RecordContext indicating the record came from the cache) rather than inferring 
intent from schema comparison?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to