rahil-c commented on code in PR #18098: URL: https://github.com/apache/hudi/pull/18098#discussion_r3075320688
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala: ########## @@ -0,0 +1,754 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.blob + +import org.apache.hudi.HoodieSparkUtils.sparkAdapter +import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType} +import org.apache.hudi.io.SeekableDataInputStream +import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StorageConfiguration, StoragePath} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, SpecificInternalRow} +import org.apache.spark.sql.types.{BinaryType, BlobType, DataType, StructField, StructType} +import org.slf4j.LoggerFactory + +import java.io.InputStream + +import scala.collection.mutable.ArrayBuffer + +/** + * Batched byte range reader that optimizes I/O by combining consecutive reads for out-of-line data. + * + * This reader analyzes sequences of read requests within a partition and merges + * consecutive or nearby reads into single I/O operations. This significantly reduces + * the number of seeks and reads when processing sorted data. + * + * <h3>Schema Requirement:</h3> + * The blob column must match the schema defined in {@link HoodieSchema.Blob}: + * <pre> + * struct { + * type: string // "inline" or "out_of_line" + * data: binary (nullable) // inline data (null for out_of_line) + * reference: struct (nullable) { // file reference (null for inline) + * external_path: string + * offset: long + * length: long + * managed: boolean + * } + * } + * </pre> + * + * <h3>Key Features:</h3> + * <ul> + * <li>Batches consecutive reads from the same file</li> + * <li>Configurable gap threshold for merging nearby reads</li> + * <li>Lookahead buffer to identify batch opportunities</li> + * <li>Preserves input row order in output</li> + * </ul> + * + * <h3>Usage Example:</h3> + * {{{ + * import org.apache.hudi.udf.BatchedByteRangeReader + * import org.apache.spark.sql.functions._ + * // Read a table with a blob column (e.g. image_data) + * val df = spark.read.format("hudi").load("/my_path").select("image_data", "record_id") + * + * // Read with batching (best when data is sorted by external_path, offset) + * val result = BatchedByteRangeReader.readBatched(df, structColName = "file_info") + * + * // Result has: image_data, record_id, data + * result.show() + * }}} + * + * <h3>Performance Tips:</h3> + * <ul> + * <li>Sort input by (blob.reference.external_path, blob.reference.offset) for maximum batching effectiveness</li> + * <li>Increase lookaheadSize for better batch detection (at cost of memory)</li> + * <li>Tune maxGapBytes based on your data access patterns</li> + * </ul> + * + * @param storage HoodieStorage instance for file I/O + * @param maxGapBytes Maximum gap between ranges to consider for batching (default: 4KB) + * @param lookaheadSize Number of rows to buffer for batch detection (default: 50) + */ +class BatchedBlobReader( + storage: HoodieStorage, + maxGapBytes: Int = 4096, + lookaheadSize: Int = 50) { + + private val logger = LoggerFactory.getLogger(classOf[BatchedBlobReader]) + + /** + * Process a partition iterator, batching consecutive reads. + * + * This method consumes the input iterator and produces an output iterator + * with each row containing the original data plus a "data" column with the + * bytes read from the file. + * + * @param rows Iterator of input rows with struct column + * @param structColIdx Index of the struct column in the row + * @param outputSchema Schema for output rows + * @param accessor Type class for accessing row fields + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Iterator of output rows with data column added + */ + def processPartition[R]( + rows: Iterator[R], + structColIdx: Int, + outputSchema: StructType) + (implicit accessor: RowAccessor[R], builder: RowBuilder[R]): Iterator[R] = { + + // Create buffered iterator for lookahead + val bufferedRows = rows.buffered + + // Result buffer to maintain order + val resultIterator = new Iterator[R] { + private var currentBatch: Iterator[R] = Iterator.empty + private var rowIndex = 0L + + override def hasNext: Boolean = { + if (currentBatch.hasNext) { + true + } else if (bufferedRows.hasNext) { + // Process next batch + currentBatch = processNextBatch() + currentBatch.hasNext + } else { + false + } + } + + override def next(): R = { + if (!hasNext) { + throw new NoSuchElementException("No more rows") + } + currentBatch.next() + } + + /** + * Collect and process the next batch of rows. + */ + private def processNextBatch(): Iterator[R] = { + // Collect up to lookaheadSize rows with their original indices + val batch = collectBatch() + + if (batch.isEmpty) { + Iterator.empty + } else { + // Partition the batch into three groups + val (inlineRows, outOfLineRows) = batch.partition(_.inlineBytes.isDefined) + val (wholeFileRows, rangeRows) = outOfLineRows.partition(_.length < 0) + + // Case 1: Inline — return bytes directly without I/O + val inlineResults = inlineRows.map { ri => + RowResult(builder.buildRow(ri.originalRow, ri.inlineBytes.get, outputSchema), ri.index) + } + + // Case 2: Whole-file reads + val wholeFileResults = wholeFileRows.map(readWholeFile(_, outputSchema)) + + // Case 3: Regular range reads — merge consecutive ranges and batch + val mergedRanges = identifyConsecutiveRanges(rangeRows) + val rangeResults = mergedRanges.flatMap(readAndSplitRange(_, outputSchema)) + + // Sort by original index to preserve input order + (inlineResults ++ wholeFileResults ++ rangeResults).sortBy(_.index).map(_.row).iterator + } + } + + /** + * Collect up to lookaheadSize rows from the input iterator. + */ + private def collectBatch(): Seq[RowInfo[R]] = { + val batch = ArrayBuffer[RowInfo[R]]() + var collected = 0 + + while (bufferedRows.hasNext && collected < lookaheadSize) { + val row = bufferedRows.next() + // Handle null struct column (null blob) + if (accessor.isNullAt(row, structColIdx)) { + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(null) + ) + rowIndex += 1 + collected += 1 + } else { + val blobStruct = accessor.getStruct(row, structColIdx, HoodieSchema.Blob.getFieldCount) + // Dispatch based on storage_type (field 0) + val storageType = accessor.getString(blobStruct, 0) + if (storageType == HoodieSchema.Blob.INLINE) { + // Case 1: Inline — bytes are in field 1 + val bytes = accessor.getBytes(blobStruct, 1) + batch += RowInfo[R]( + originalRow = row, + filePath = "", + offset = -1, + length = -1, + index = rowIndex, + inlineBytes = Some(bytes) + ) + } else { + // Case 2 or 3: Out-of-line — get reference struct (field 2) + val referenceStruct = accessor.getStruct(blobStruct, 2, HoodieSchema.Blob.getReferenceFieldCount) + val filePath = accessor.getString(referenceStruct, 0) + val offsetIsNull = accessor.isNullAt(referenceStruct, 1) + val lengthIsNull = accessor.isNullAt(referenceStruct, 2) + if (offsetIsNull || lengthIsNull) { + // Case 2: Whole-file read — no offset/length specified; sentinel length = -1 + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = 0, + length = -1, + index = rowIndex + ) + } else { + // Case 3: Regular range read + val offset = accessor.getLong(referenceStruct, 1) + val length = accessor.getLong(referenceStruct, 2) + batch += RowInfo[R]( + originalRow = row, + filePath = filePath, + offset = offset, + length = length, + index = rowIndex + ) + } + } + rowIndex += 1 + collected += 1 + } + } + batch.toSeq + } + } + + resultIterator + } + + /** + * Identify consecutive ranges that can be batched together. + * + * This method groups rows by file path, sorts by offset, and merges + * ranges that are consecutive or within maxGapBytes of each other. + * + * @param rows Sequence of row information + * @return Sequence of merged ranges + */ + private def identifyConsecutiveRanges[R](rows: Seq[RowInfo[R]]): Seq[MergedRange[R]] = { + // Group by file path + val byFile = rows.groupBy(_.filePath) + + val allRanges = ArrayBuffer[MergedRange[R]]() + + byFile.foreach { case (filePath, fileRows) => + // Sort by offset + val sorted = fileRows.sortBy(_.offset) + + // Merge consecutive ranges + val merged = mergeRanges(sorted, maxGapBytes) + allRanges ++= merged + } + + allRanges.toSeq + } + + /** + * Merge consecutive ranges within the gap threshold. + * + * @param rows Sorted rows from the same file + * @param maxGap Maximum gap to consider for merging + * @return Sequence of merged ranges + */ + private def mergeRanges[R](rows: Seq[RowInfo[R]], maxGap: Int): Seq[MergedRange[R]] = { + + val result = ArrayBuffer[MergedRange[R]]() + var currentFilePath: String = null + var currentStartOffset: Long = 0L + var currentEndOffset: Long = 0L + var currentRows: ArrayBuffer[RowInfo[R]] = null + + rows.foreach { row => + if (currentRows == null) { + // Start first range + currentFilePath = row.filePath + currentStartOffset = row.offset + currentEndOffset = row.offset + row.length + currentRows = ArrayBuffer(row) + } else { + val gap = row.offset - currentEndOffset + // Check for overlap + if (row.offset < currentEndOffset) { + throw new IllegalArgumentException( + s"Overlapping blob ranges detected: previous range [${currentStartOffset}, ${currentEndOffset}) and current row [${row.offset}, ${row.offset + row.length}) in file ${row.filePath}" + ) + } + if (gap >= 0 && gap <= maxGap) { + // Merge into current range + currentEndOffset = math.max(currentEndOffset, row.offset + row.length) + currentRows += row + } else { + // Save current range and start new one + result += MergedRange[R]( + filePath = currentFilePath, + startOffset = currentStartOffset, + endOffset = currentEndOffset, + rows = currentRows.toSeq + ) + currentFilePath = row.filePath + currentStartOffset = row.offset + currentEndOffset = row.offset + row.length + currentRows = ArrayBuffer(row) + } + } + } + + // Add final range + if (currentRows != null) { + result += MergedRange[R]( + filePath = currentFilePath, + startOffset = currentStartOffset, + endOffset = currentEndOffset, + rows = currentRows.toSeq + ) + } + + result.toSeq + } + + /** + * Read an entire file and return it as a single row result. + * + * Used for whole-file out-of-line blobs where no offset or length is specified. + * + * @param rowInfo Row information with the file path + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence containing a single row result + */ + private def readWholeFile[R]( + rowInfo: RowInfo[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): RowResult[R] = { + + var inputStream: InputStream = null + try { + val path = new StoragePath(rowInfo.filePath) + inputStream = storage.open(path) + val buffer = inputStream.readAllBytes() + + logger.debug(s"Read entire file ${rowInfo.filePath} (${buffer.length} bytes)") + + RowResult(builder.buildRow(rowInfo.originalRow, buffer, outputSchema), rowInfo.index) + } finally { + if (inputStream != null) { + try { + inputStream.close() + } catch { + case e: Exception => + logger.warn(s"Error closing stream for ${rowInfo.filePath}", e) + } + } + } + } + + /** + * Read a merged range and split it back into individual row results. + * + * This method performs a single I/O operation to read the entire merged + * range, then splits the buffer into individual results for each original + * row. + * + * @param range The merged range to read + * @param outputSchema Schema for output rows + * @param builder Type class for building output rows + * @tparam R Row type (Row or InternalRow) + * @return Sequence of row results with original indices + */ + private def readAndSplitRange[R]( + range: MergedRange[R], + outputSchema: StructType) + (implicit builder: RowBuilder[R]): Seq[RowResult[R]] = { + + var inputStream: SeekableDataInputStream = null + try { + // Get or open file handle + inputStream = storage.openSeekable(new StoragePath(range.filePath), false) + + // Seek to start offset + inputStream.seek(range.startOffset) + + // Read the entire merged range + val totalLength = (range.endOffset - range.startOffset).toInt + require(totalLength >= 0, s"Range too large: ${range.endOffset - range.startOffset} bytes exceeds Int.MaxValue") + val buffer = new Array[Byte](totalLength) + inputStream.readFully(buffer, 0, totalLength) + + logger.debug( + s"Read ${totalLength} bytes from ${range.filePath} at offset ${range.startOffset} " + + s"for ${range.rows.length} rows" + ) + + // Split buffer into individual results + range.rows.map { rowInfo => + val relativeOffset = (rowInfo.offset - range.startOffset).toInt + val data = buffer.slice(relativeOffset, relativeOffset + rowInfo.length.toInt) + + // Build output row using type class + val outputRow = builder.buildRow(rowInfo.originalRow, data, outputSchema) + + RowResult[R]( + row = outputRow, + index = rowInfo.index + ) + } + + } catch { + case e: Exception => + logger.error( + s"Failed to read range from ${range.filePath} " + + s"at offset ${range.startOffset}, length ${range.endOffset - range.startOffset}", + e + ) + throw e + } finally { + if (inputStream != null) { + try { + inputStream.close() + } catch { + case e: Exception => + logger.warn(s"Error closing input stream for ${range.filePath}", e) + } + } + } + } +} + +/** + * Type class for accessing row fields. + * Abstracts over Row and InternalRow API differences. + * + * @tparam R Row type (Row or InternalRow) + */ +private[blob] trait RowAccessor[R] { + def getStruct(row: R, structColIdx: Int, numFields: Int): R + def getString(struct: R, fieldIdx: Int): String + def getLong(struct: R, fieldIdx: Int): Long + def getBytes(row: R, fieldIdx: Int): Array[Byte] + def isNullAt(row: R, fieldIdx: Int): Boolean +} + +/** + * Type class for building output rows. + * Abstracts over Row and InternalRow construction. + * + * @tparam R Row type (Row or InternalRow) + */ +private[blob] trait RowBuilder[R] { + def buildRow(originalRow: R, data: Array[Byte], outputSchema: StructType): R +} + +/** + * Type class instances for Row. + */ +private[blob] object RowAccessor { + implicit val rowAccessor: RowAccessor[Row] = new RowAccessor[Row] { + override def getStruct(row: Row, structColIdx: Int, numFields: Int): Row = row.getStruct(structColIdx) + override def getString(struct: Row, fieldIdx: Int): String = struct.getString(fieldIdx) + override def getLong(struct: Row, fieldIdx: Int): Long = struct.getLong(fieldIdx) + override def getBytes(row: Row, fieldIdx: Int): Array[Byte] = row.getAs[Array[Byte]](fieldIdx) + override def isNullAt(row: Row, fieldIdx: Int): Boolean = row.isNullAt(fieldIdx) + } + + implicit val internalRowAccessor: RowAccessor[InternalRow] = new RowAccessor[InternalRow] { + override def getStruct(row: InternalRow, structColIdx: Int, numFields: Int): InternalRow = row.getStruct(structColIdx, numFields) + override def getString(struct: InternalRow, fieldIdx: Int): String = struct.getUTF8String(fieldIdx).toString + override def getLong(struct: InternalRow, fieldIdx: Int): Long = struct.getLong(fieldIdx) + override def getBytes(row: InternalRow, fieldIdx: Int): Array[Byte] = row.getBinary(fieldIdx) + override def isNullAt(row: InternalRow, fieldIdx: Int): Boolean = row.isNullAt(fieldIdx) + } +} + +/** + * Type class instances for Row builders. + */ +private[blob] object RowBuilder { + implicit val rowBuilder: RowBuilder[Row] = new RowBuilder[Row] { + override def buildRow(originalRow: Row, data: Array[Byte], outputSchema: StructType): Row = { + val outputValues = new Array[Any](originalRow.length + 1) + var i = 0 + while (i < originalRow.length) { + outputValues(i) = originalRow.get(i) + i += 1 + } + outputValues(originalRow.length) = data + new GenericRowWithSchema(outputValues, outputSchema) + } + } + + implicit val internalRowBuilder: RowBuilder[InternalRow] = new RowBuilder[InternalRow] { + override def buildRow(originalRow: InternalRow, data: Array[Byte], outputSchema: StructType): InternalRow = { + val outputRow = new SpecificInternalRow(outputSchema.fields.map(_.dataType)) + var i = 0 + while (i < originalRow.numFields) { + if (originalRow.isNullAt(i)) { + outputRow.setNullAt(i) + } else { + val dataType = outputSchema.fields(i).dataType + // Copy field using generic get/update for compatibility + outputRow.update(i, originalRow.get(i, dataType)) + } + i += 1 + } + // Set the data field (last position) + outputRow.update(originalRow.numFields, data) + outputRow + } + } +} + +/** + * Information about a single row to be read. + * + * @param originalRow Original input row + * @param filePath Path to the file + * @param offset Byte offset in file + * @param length Number of bytes to read + * @param index Original position in input (for ordering) + * @tparam R Row type (Row or InternalRow) + */ +private case class RowInfo[R]( + originalRow: R, + filePath: String, + offset: Long, + length: Long, + index: Long, + inlineBytes: Option[Array[Byte]] = None) + +/** + * A merged range combining multiple consecutive reads. + * + * @param filePath Path to the file + * @param startOffset Start byte offset of merged range + * @param endOffset End byte offset of merged range (exclusive) + * @param rows Individual rows included in this range + * @tparam R Row type (Row or InternalRow) + */ +private case class MergedRange[R]( + filePath: String, + startOffset: Long, + endOffset: Long, + rows: Seq[RowInfo[R]]) { +} + +/** + * Result row with its original index for ordering. + * + * @param row Output row with data + * @param index Original position in input + * @tparam R Row type (Row or InternalRow) + */ +private case class RowResult[R]( + row: R, + index: Long) + +/** + * Companion object providing the main API for batched byte range reading. + */ +object BatchedBlobReader { + + val MAX_GAP_BYTES_CONF = "hoodie.blob.batching.max.gap.bytes" + val LOOKAHEAD_SIZE_CONF = "hoodie.blob.batching.lookahead.size" + + /** Default maximum gap to consider for batching */ + val DEFAULT_MAX_GAP_BYTES = 4096 + + /** Default lookahead buffer size */ + val DEFAULT_LOOKAHEAD_SIZE = 50 Review Comment: [nit] Is this is lookahead meaning number of bytes to look ahead or for number of rows? Should we rename the variable here to be more clear? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
