wombatu-kun commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3036762313
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,73 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.SparkAdapter
+import java.io.IOException
+import java.util.Properties
import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
+ // Maps InternalRow instances (by identity) to their original Avro records
when the Avro record's
Review Comment:
The singleton is only used from `HoodieSparkRecord.isDeleteRecord()`, which
calls `isDeleteRecordByColumnValue()` — this path never populates the
`avroRecordByRow` cache (`extractDataFromRecord` and `convertToAvroRecord` are
never called on the singleton). The cache stays empty, so thread-safety is a
non-issue in practice. Added a comment documenting this invariant so it doesn't
regress silently.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,84 @@ package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hudi.SparkAdapter
+import java.io.IOException
+import java.util.Properties
import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
+ // Maps InternalRow instances (by identity) to their original Avro records
when the Avro record's
+ // schema differs from the BufferedRecord schema. This handles
ExpressionPayload records whose
+ // getInsertValue result carries the data schema (no meta fields) while the
BufferedRecord
+ // stores writeSchemaWithMetaFields. Returning the original Avro record from
convertToAvroRecord
+ // lets ExpressionPayload.combineAndGetUpdateValue decode bytes with the
correct data schema.
+ //
+ // IMPORTANT INVARIANT: The identity link between InternalRow and
GenericRecord must survive
+ // from extractDataFromRecord (where the cache is populated) to
convertToAvroRecord (where
+ // it is consumed via remove()). Any operation that replaces the InternalRow
object in between
+ // — such as BufferedRecord.seal() (which calls InternalRow.copy()),
replaceRecord(), or
+ // project() — would break this link and cause fallback to schema-based
serialization.
Review Comment:
The leak is bounded — it can only happen for SENTINEL/ignored records that
get cached but bypass `convertToAvroRecord`. The
`SparkFileFormatInternalRecordContext` instance is scoped to a single FileGroup
read, so any leaked entries are GC'd when the reader finishes. Added a
lifecycle comment documenting this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,8 +251,11 @@ public static Collection<Pair<String, Long>>
filterKeysFromFile(StoragePath file
return Collections.emptyList();
}
log.info("Going to filter {} keys from file {}",
candidateRecordKeys.size(), filePath);
Review Comment:
Correct — this is pre-existing behavior, not a regression from this PR. The
old code was hardcoded to AVRO as well. Added a TODO to respect the merger's
configured record type in a follow-up.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,49 @@ protected BufferedRecord<T>
handleNonDeletes(BufferedRecord<T> previousRecord, B
// special case for payloads when there is no previous record
HoodieSchema recordSchema =
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
GenericRecord record =
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(),
recordSchema);
- HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
- try {
- if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
- return null;
- } else {
- HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
- // If the record schema is different from the reader schema,
rewrite the record using the payload methods to ensure consistency with legacy
writer paths
- hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties,
readerSchema).toIndexedRecord(readerSchema, properties)
- .ifPresent(rewrittenRecord ->
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+ Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+ // If convertToAvroRecord returned a cached record with a different
schema (e.g., from
+ // extractDataFromRecord caching for ExpressionPayload in the COW
write path), the record
+ // is already in write-schema format with correctly evaluated
expressions. Convert directly.
+ // Note: SENTINEL records (used by ExpressionPayload to signal "skip
this record") always
+ // have null schema (HoodieRecord.EmptyRecord.getSchema() returns
null), so they cannot
+ // enter this branch and will always go through the payload path where
shouldIgnore handles them.
+ if (record.getSchema() != null &&
!record.getSchema().equals(recordAvroSchema)) {
+
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(record));
+ } else {
+ HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null,
HoodieRecordUtils.loadPayload(payloadClass, record,
mergedRecord.getOrderingValue()));
+ try {
+ if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+ return null;
+ }
+ // Evaluate the payload to get the insert value
+ Option<IndexedRecord> insertValueOpt =
hoodieRecord.getData().getInsertValue(recordAvroSchema, properties);
+ if (insertValueOpt.isPresent()) {
+ GenericRecord insertRecord = (GenericRecord)
insertValueOpt.get();
+ HoodieSchema readerSchema =
readerContext.getSchemaHandler().getRequestedSchema();
Review Comment:
The record is emitted immediately after `replaceRecord()` via
`super.handleNonDeletes()` — there is no spill or serialization step between
`replaceRecord` and emit where a stale `schemaId` could cause issues. Added a
comment documenting this no-spill assumption.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]