leekeiabstraction commented on code in PR #2933:
URL: https://github.com/apache/fluss/pull/2933#discussion_r3018641743


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java:
##########
@@ -153,7 +155,17 @@ public boolean hasNext() {
 
         @Override
         public ScanRecord next() {
-            return new ScanRecord(rowIterator.next());
+            InternalRow row = rowIterator.next();
+            int sizeInBytes = -1;
+            if (row instanceof ProjectedRow) {
+                InternalRow underlyingRow = ((ProjectedRow) 
row).getUnderlyingRow();
+                if (underlyingRow instanceof BinaryRow) {
+                    sizeInBytes = ((BinaryRow) underlyingRow).getSizeInBytes();
+                }
+            } else if (row instanceof BinaryRow) {
+                sizeInBytes = ((BinaryRow) row).getSizeInBytes();
+            }

Review Comment:
   Do we expect other row types? If not, it might be better to throw exception 
or log (at debug considering that this is hot path)?



##########
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchCollectorTest.java:
##########
@@ -147,14 +145,51 @@ void testCollectAfterUnassign() throws Exception {
         // unassign bucket 2
         logScannerStatus.unassignScanBuckets(Collections.singletonList(tb2));
 
-        Map<TableBucket, List<ScanRecord>> bucketAndRecords =
-                logFetchCollector.collectFetch(logFetchBuffer);
+        ScanRecords bucketAndRecords = 
logFetchCollector.collectFetch(logFetchBuffer);
         // should only contain records for bucket 1
-        assertThat(bucketAndRecords.keySet()).containsExactly(tb1);
+        assertThat(bucketAndRecords.buckets()).containsExactly(tb1);
 
         // collect again, should be empty
         bucketAndRecords = logFetchCollector.collectFetch(logFetchBuffer);
-        assertThat(bucketAndRecords.size()).isEqualTo(0);
+        assertThat(bucketAndRecords.buckets().size()).isEqualTo(0);
+    }
+
+    @Test
+    void testTotalBytesRead() throws Exception {
+        TableBucket tb1 = new TableBucket(DATA1_TABLE_ID, 1L, 1);
+        TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1L, 2);
+        Map<TableBucket, Long> scanBuckets = new HashMap<>();
+        scanBuckets.put(tb1, 0L);
+        scanBuckets.put(tb2, 0L);
+        logScannerStatus.assignScanBuckets(scanBuckets);
+
+        CompletedFetch completedFetch1 =
+                makeCompletedFetch(
+                        tb1,
+                        new FetchLogResultForBucket(tb1, 
genMemoryLogRecordsByObject(DATA1), 10L),
+                        0L);
+        CompletedFetch completedFetch2 =
+                makeCompletedFetch(
+                        tb2,
+                        new FetchLogResultForBucket(tb2, 
genMemoryLogRecordsByObject(DATA1), 10L),
+                        0L);
+
+        logFetchBuffer.add(completedFetch1);
+        logFetchBuffer.add(completedFetch2);
+
+        ScanRecords scanRecords = 
logFetchCollector.collectFetch(logFetchBuffer);
+
+        // Both fetches should be fully consumed
+        assertThat(completedFetch1.isConsumed()).isTrue();
+        assertThat(completedFetch2.isConsumed()).isTrue();
+
+        // totalBytesRead should be the sum of both completed fetches
+        long totalBytesRead = 0;
+        for (ScanRecord record : scanRecords) {
+            assertThat(record.getSizeInBytes()).isGreaterThan(0);
+            totalBytesRead += record.getSizeInBytes();
+        }
+        assertThat(totalBytesRead).isGreaterThan(0);

Review Comment:
   We should be able to use a more precise assertion here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to