luoyuxia commented on code in PR #2933:
URL: https://github.com/apache/fluss/pull/2933#discussion_r3013796528
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java:
##########
@@ -135,6 +136,11 @@ public Map<TableBucket, List<ScanRecord>>
collectFetch(final LogFetchBuffer logF
recordsRemaining -= records.size();
}
+
+ // Only count bytes when the fetch is fully consumed
+ if (nextInLineFetch.isConsumed()) {
Review Comment:
IIUC, totalBytesRead here is not an accurate per-ScanRecords measurement.
Some ScanRecords may return records with totalBytesRead == 0, while others may
accumulate the full size of a completed fetch and look over-counted relative to
the records returned in that poll.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -502,10 +514,15 @@ private TableBucketWriteResultWithSplitIds
forSnapshotSplitRecords(
LakeWriter<WriteResult> lakeWriter =
getOrCreateLakeWriter(
bucket,
checkNotNull(currentSnapshotSplit).getPartitionName());
+ long bytesRead = 0;
while (recordIterator.hasNext()) {
ScanRecord scanRecord = recordIterator.next().record();
lakeWriter.write(scanRecord);
+ InternalRow row = scanRecord.getRow();
+ // Snapshot path always produces BinaryRow
(CompactedRow/IndexedRow).
Review Comment:
TieringSplitReader’s snapshot-path byte accounting will throw
ClassCastException as soon as snapshot rows are projected or schema-remapped.
SnapshotFilesReader.next() can legitimately return a ProjectedRow, but
TieringSplitReader.forSnapshotSplitRecords() unconditionally casts every
InternalRow to BinaryRow. That makes the new metric path unsafe for
evolved-schema snapshots or any snapshot scan with projectedFields.
```
SnapshotFilesReader
BinaryValue originValue = valueDecoder.decodeValue(value);
InternalRow originRow = originValue.row;
if (targetSchemaId != originValue.schemaId) {
int[] indexMapping =
schemaProjectionCache.computeIfAbsent(
originValue.schemaId,
sourceSchemaId ->
SchemaUtil.getIndexMapping(
schemaGetter.getSchema(sourceSchemaId),
targetSchema));
originRow = ProjectedRow.from(indexMapping).replaceRow(originRow);
}
```
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/ScanRecords.java:
##########
@@ -37,12 +37,27 @@
*/
@PublicEvolving
public class ScanRecords implements Iterable<ScanRecord> {
- public static final ScanRecords EMPTY = new
ScanRecords(Collections.emptyMap());
+ public static final ScanRecords EMPTY = new
ScanRecords(Collections.emptyMap(), 0);
private final Map<TableBucket, List<ScanRecord>> records;
+ private final long totalBytesRead;
public ScanRecords(Map<TableBucket, List<ScanRecord>> records) {
+ this(records, 0);
+ }
+
+ public ScanRecords(Map<TableBucket, List<ScanRecord>> records, long
totalBytesRead) {
this.records = records;
+ this.totalBytesRead = totalBytesRead;
+ }
+
+ /**
+ * Get the total bytes read from the Fluss log in this batch.
+ *
+ * @return the total bytes read
+ */
+ public long getTotalBytesRead() {
Review Comment:
```suggestion
public long getTotalBytes() {
```
nit: I think we can remove `read` suffix since `ScanRecords` already means
read/scan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]