wombatu-kun commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3036841886


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -111,65 +113,284 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
         // Read data with column projection (filters not supported yet)
         val arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE)
 
-        // Create iterator using shared LanceRecordIterator
-        lanceIterator = new LanceRecordIterator(
-          allocator,
-          lanceReader,
-          arrowReader,
-          requestSchema,
-          filePath
-        )
-
-        // Register cleanup listener
-        Option(TaskContext.get()).foreach { ctx =>
-          ctx.addTaskCompletionListener[Unit](_ => lanceIterator.close())
-        }
-
-        // Create the following projections for schema evolution:
-        // 1. Padding projection: add NULL for missing columns
-        // 2. Casting projection: handle type conversions
-        val schemaUtils = sparkAdapter.getSchemaUtils
-        val paddingProj = 
SparkSchemaTransformUtils.generateNullPaddingProjection(requestSchema, 
requiredSchema)
-        val castProj = SparkSchemaTransformUtils.generateUnsafeProjection(
-          schemaUtils.toAttributes(requiredSchema),
-          Some(SQLConf.get.sessionLocalTimeZone),
-          implicitTypeChangeInfo,
-          requiredSchema,
-          new StructType(),
-          schemaUtils
-        )
-
-        // Unify projections by applying padding and then casting for each row
-        val projection: UnsafeProjection = new UnsafeProjection {
-          def apply(row: InternalRow): UnsafeRow =
-            castProj(paddingProj(row))
-        }
-        val projectedIter = lanceIterator.asScala.map(projection.apply)
-
-        // Handle partition columns
-        if (partitionSchema.length == 0) {
-          // No partition columns - return rows directly
-          projectedIter
+        // Decide between batch mode and row mode.
+        // Fall back to row mode if type casting is needed (batch-level type 
casting deferred to follow-up).
+        val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+        if (enableVectorizedReader && !hasTypeChanges) {
+          readBatch(file, allocator, lanceReader, arrowReader, filePath,
+            requestSchema, requiredSchema, partitionSchema)
         } else {
-          // Create UnsafeProjection to convert JoinedRow to UnsafeRow
-          val fullSchema = (requiredSchema.fields ++ 
partitionSchema.fields).map(f =>
-            AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
-          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
-
-          // Append partition values to each row using JoinedRow, then convert 
to UnsafeRow
-          val joinedRow = new JoinedRow()
-          projectedIter.map(row => unsafeProjection(joinedRow(row, 
file.partitionValues)))
+          readRows(file, allocator, lanceReader, arrowReader, filePath,
+            requestSchema, requiredSchema, partitionSchema, 
implicitTypeChangeInfo)

Review Comment:
   Fixed — moved `lanceReader`/`arrowReader` to `var` declarations before the 
try block so the catch block can close all three resources in reverse order, 
with suppressed exception handling to avoid masking the original error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to