wombatu-kun commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3036762313


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,73 @@ package org.apache.hudi
 
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hudi.SparkAdapter
 
+import java.io.IOException
+import java.util.Properties
 import scala.collection.mutable
 
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
   private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
+  // Maps InternalRow instances (by identity) to their original Avro records 
when the Avro record's

Review Comment:
   The singleton is only used from `HoodieSparkRecord.isDeleteRecord()`, which 
calls `isDeleteRecordByColumnValue()` — this path never populates the 
`avroRecordByRow` cache (`extractDataFromRecord` and `convertToAvroRecord` are 
never called on the singleton). The cache stays empty, so thread-safety is a 
non-issue in practice. Added a comment documenting this invariant so it doesn't 
regress silently.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,84 @@ package org.apache.hudi
 
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hudi.SparkAdapter
 
+import java.io.IOException
+import java.util.Properties
 import scala.collection.mutable
 
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
   private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
+  // Maps InternalRow instances (by identity) to their original Avro records 
when the Avro record's
+  // schema differs from the BufferedRecord schema. This handles 
ExpressionPayload records whose
+  // getInsertValue result carries the data schema (no meta fields) while the 
BufferedRecord
+  // stores writeSchemaWithMetaFields. Returning the original Avro record from 
convertToAvroRecord
+  // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the 
correct data schema.
+  //
+  // IMPORTANT INVARIANT: The identity link between InternalRow and 
GenericRecord must survive
+  // from extractDataFromRecord (where the cache is populated) to 
convertToAvroRecord (where
+  // it is consumed via remove()). Any operation that replaces the InternalRow 
object in between
+  // — such as BufferedRecord.seal() (which calls InternalRow.copy()), 
replaceRecord(), or
+  // project() — would break this link and cause fallback to schema-based 
serialization.

Review Comment:
   The leak is bounded — it can only happen for SENTINEL/ignored records that 
get cached but bypass `convertToAvroRecord`. The 
`SparkFileFormatInternalRecordContext` instance is scoped to a single FileGroup 
read, so any leaked entries are GC'd when the reader finishes. Added a 
lifecycle comment documenting this.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,8 +251,11 @@ public static Collection<Pair<String, Long>> 
filterKeysFromFile(StoragePath file
       return Collections.emptyList();
     }
     log.info("Going to filter {} keys from file {}", 
candidateRecordKeys.size(), filePath);

Review Comment:
   Correct — this is pre-existing behavior, not a regression from this PR. The 
old code was hardcoded to AVRO as well. Added a TODO to respect the merger's 
configured record type in a follow-up.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,49 @@ protected BufferedRecord<T> 
handleNonDeletes(BufferedRecord<T> previousRecord, B
         // special case for payloads when there is no previous record
         HoodieSchema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
         GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
-        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
-        try {
-          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
-            return null;
-          } else {
-            HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
-            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
-            hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, 
readerSchema).toIndexedRecord(readerSchema, properties)
-                .ifPresent(rewrittenRecord -> 
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+        Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+        // If convertToAvroRecord returned a cached record with a different 
schema (e.g., from
+        // extractDataFromRecord caching for ExpressionPayload in the COW 
write path), the record
+        // is already in write-schema format with correctly evaluated 
expressions. Convert directly.
+        // Note: SENTINEL records (used by ExpressionPayload to signal "skip 
this record") always
+        // have null schema (HoodieRecord.EmptyRecord.getSchema() returns 
null), so they cannot
+        // enter this branch and will always go through the payload path where 
shouldIgnore handles them.
+        if (record.getSchema() != null && 
!record.getSchema().equals(recordAvroSchema)) {
+          
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(record));
+        } else {
+          HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
+          try {
+            if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+              return null;
+            }
+            // Evaluate the payload to get the insert value
+            Option<IndexedRecord> insertValueOpt = 
hoodieRecord.getData().getInsertValue(recordAvroSchema, properties);
+            if (insertValueOpt.isPresent()) {
+              GenericRecord insertRecord = (GenericRecord) 
insertValueOpt.get();
+              HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();

Review Comment:
   The record is emitted immediately after `replaceRecord()` via 
`super.handleNonDeletes()` — there is no spill or serialization step between 
`replaceRecord` and emit where a stale `schemaId` could cause issues. Added a 
comment documenting this no-spill assumption.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to