rahil-c commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3069731776
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,88 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.SparkAdapter
+import java.io.IOException
+import java.util.Properties
import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
+ // Maps InternalRow instances (by identity) to their original Avro records
when the Avro record's
+ // schema differs from the BufferedRecord schema. This handles
ExpressionPayload records whose
+ // getInsertValue result carries the data schema (no meta fields) while the
BufferedRecord
+ // stores writeSchemaWithMetaFields. Returning the original Avro record from
convertToAvroRecord
+ // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the
correct data schema.
+ //
+ // IMPORTANT INVARIANT: The identity link between InternalRow and
GenericRecord must survive
+ // from extractDataFromRecord (where the cache is populated) to
convertToAvroRecord (where
+ // it is consumed via remove()). Any operation that replaces the InternalRow
object in between
Review Comment:
**[Critical]** The `IdentityHashMap<InternalRow, GenericRecord>` cache
relies on object identity surviving from `extractDataFromRecord` to
`convertToAvroRecord`. Any intermediate `InternalRow.copy()`, `seal()`,
`replaceRecord()`, or `project()` silently breaks the link and falls back to
schema-based serialization — which produces wrong results for ExpressionPayload
(wrong schema used for decode).
The extensive comments documenting this invariant are actually evidence of
how fragile it is. Consider either:
1. A wrapper type that carries the GenericRecord explicitly (e.g. a
`TaggedInternalRow`), or
2. Keying by record key string instead of object identity
At minimum, add a debug assertion or metric when the cache is non-empty
after a FileGroup read completes, so violations are caught rather than silently
producing wrong data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]