Copilot commented on code in PR #2809:
URL: https://github.com/apache/fluss/pull/2809#discussion_r2904335415


##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -288,6 +288,43 @@ message LimitScanResponse{
 }
 
 
+// Full KV scan request and response.
+// A new scan is initiated with bucket_scan_req; subsequent batches use 
scanner_id.
+message PbScanReqForBucket {
+  required int64 table_id = 1;
+  optional int64 partition_id = 2;
+  required int32 bucket_id = 3;
+  // If set, stops returning rows after this many records.
+  optional int64 limit = 4;
+}
+
+message ScanKvRequest {
+  // Mutually exclusive: either scanner_id (continuation) or bucket_scan_req 
(new scan).
+  optional bytes scanner_id = 1;
+  optional PbScanReqForBucket bucket_scan_req = 2;
+  // Monotonically increasing sequence number for in-order delivery validation.
+  optional int32 call_seq_id = 3;
+  // Maximum number of bytes of record data to return in this batch.
+  required int32 batch_size_bytes = 4;

Review Comment:
   `batch_size_bytes` is `required`, but the client sends a close request 
without setting it (see `KvBatchScanner.close()`), and the server reads it 
unconditionally. This makes `close_scanner` requests invalid by construction 
(or can default to 0), and can also lead to non-progressing scans when 
`batch_size_bytes` is 0. Consider making `batch_size_bytes` optional with a 
default, or modeling the request as a `oneof` (e.g., `open/continue/close`) so 
close requests don’t require batch sizing.
   ```suggestion
     optional int32 batch_size_bytes = 4;
   ```



##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java:
##########
@@ -242,10 +245,15 @@ public enum Errors {
     REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", 
RebalanceFailureException::new),
     NO_REBALANCE_IN_PROGRESS_EXCEPTION(
             62, "No rebalance task in progress.", 
NoRebalanceInProgressException::new),
-    INVALID_PRODUCER_ID_EXCEPTION(
-            63,
-            "The client has attempted to perform an operation with an invalid 
producer ID.",
-            InvalidProducerIdException::new);
+    SCANNER_EXPIRED(
+            64, "The scanner session has expired due to inactivity.", 
ScannerExpiredException::new),
+    UNKNOWN_SCANNER_ID(
+            65, "The scanner id is not recognized by the server.", 
UnknownScannerIdException::new),
+    INVALID_SCAN_REQUEST(66, "The scan request is invalid.", 
InvalidScanRequestException::new),
+    TOO_MANY_SCANNERS(
+            67,
+            "The per-bucket or per-server scanner session limit has been 
reached.",
+            TooManyScannersException::new);

Review Comment:
   This change appears to remove/reassign the previously existing 
`INVALID_PRODUCER_ID_EXCEPTION(63, ...)` entry. Error codes are typically a 
wire-compatibility contract; removing a code will cause older clients/servers 
that emit/expect code 63 to map it incorrectly (likely to 
`UNKNOWN_SERVER_ERROR`). Consider restoring `INVALID_PRODUCER_ID_EXCEPTION` 
with its original code and appending the new scan-related errors with new, 
unused codes.



##########
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java:
##########
@@ -422,6 +433,114 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> 
notifyLakeTableOffset(
         return response;
     }
 
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
+        ScanKvResponse response = new ScanKvResponse();
+        try {
+            ScannerContext context;
+
+            if (request.hasBucketScanReq()) {
+                // New scan: open a fresh scanner session
+                PbScanReqForBucket bucketReq = request.getBucketScanReq();
+                long tableId = bucketReq.getTableId();
+                authorizeTable(READ, tableId);

Review Comment:
   The handler does not enforce the documented mutual exclusivity of 
`bucket_scan_req` vs `scanner_id`. If both are set, it silently treats the 
request as a new scan and ignores `scanner_id`, which can create duplicate 
sessions. Also, `batch_size_bytes` is read unconditionally after the close 
logic is evaluated but only after a `ScannerContext` is obtained; combined with 
`batch_size_bytes` potentially being 0 (or missing), the scan can become 
non-progressing (empty batches with `has_more_results=true` and no cursor 
advance). Recommended: (1) reject requests that set both fields; (2) validate 
`batch_size_bytes > 0` for non-close requests (and/or guarantee at least one 
record per response); (3) allow `close_scanner` requests to succeed even if 
`batch_size_bytes` is absent by structuring validation accordingly.



##########
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java:
##########
@@ -422,6 +433,114 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> 
notifyLakeTableOffset(
         return response;
     }
 
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
+        ScanKvResponse response = new ScanKvResponse();
+        try {
+            ScannerContext context;
+
+            if (request.hasBucketScanReq()) {
+                // New scan: open a fresh scanner session
+                PbScanReqForBucket bucketReq = request.getBucketScanReq();
+                long tableId = bucketReq.getTableId();
+                authorizeTable(READ, tableId);
+
+                TableBucket tableBucket =
+                        new TableBucket(
+                                tableId,
+                                bucketReq.hasPartitionId() ? 
bucketReq.getPartitionId() : null,
+                                bucketReq.getBucketId());
+                Long limit = bucketReq.hasLimit() ? bucketReq.getLimit() : 
null;
+
+                context =
+                        scannerManager.createScanner(
+                                replicaManager.getLeaderKvTablet(tableBucket), 
tableBucket, limit);
+
+                if (context == null) {
+                    // Bucket is empty — return an empty response immediately 
without registering a
+                    // session.
+                    response.setHasMoreResults(false);
+                    return CompletableFuture.completedFuture(response);
+                }
+            } else {
+                if (!request.hasScannerId()) {
+                    throw new InvalidScanRequestException(
+                            "ScanKvRequest must have either bucket_scan_req 
(new scan) "
+                                    + "or scanner_id (continuation).");
+                }

Review Comment:
   The handler does not enforce the documented mutual exclusivity of 
`bucket_scan_req` vs `scanner_id`. If both are set, it silently treats the 
request as a new scan and ignores `scanner_id`, which can create duplicate 
sessions. Also, `batch_size_bytes` is read unconditionally after the close 
logic is evaluated but only after a `ScannerContext` is obtained; combined with 
`batch_size_bytes` potentially being 0 (or missing), the scan can become 
non-progressing (empty batches with `has_more_results=true` and no cursor 
advance). Recommended: (1) reject requests that set both fields; (2) validate 
`batch_size_bytes > 0` for non-close requests (and/or guarantee at least one 
record per response); (3) allow `close_scanner` requests to succeed even if 
`batch_size_bytes` is absent by structuring validation accordingly.



##########
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java:
##########
@@ -422,6 +433,114 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> 
notifyLakeTableOffset(
         return response;
     }
 
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
+        ScanKvResponse response = new ScanKvResponse();
+        try {
+            ScannerContext context;
+
+            if (request.hasBucketScanReq()) {
+                // New scan: open a fresh scanner session
+                PbScanReqForBucket bucketReq = request.getBucketScanReq();
+                long tableId = bucketReq.getTableId();
+                authorizeTable(READ, tableId);
+
+                TableBucket tableBucket =
+                        new TableBucket(
+                                tableId,
+                                bucketReq.hasPartitionId() ? 
bucketReq.getPartitionId() : null,
+                                bucketReq.getBucketId());
+                Long limit = bucketReq.hasLimit() ? bucketReq.getLimit() : 
null;
+
+                context =
+                        scannerManager.createScanner(
+                                replicaManager.getLeaderKvTablet(tableBucket), 
tableBucket, limit);
+
+                if (context == null) {
+                    // Bucket is empty — return an empty response immediately 
without registering a
+                    // session.
+                    response.setHasMoreResults(false);
+                    return CompletableFuture.completedFuture(response);
+                }
+            } else {
+                if (!request.hasScannerId()) {
+                    throw new InvalidScanRequestException(
+                            "ScanKvRequest must have either bucket_scan_req 
(new scan) "
+                                    + "or scanner_id (continuation).");
+                }
+                byte[] scannerId = request.getScannerId();
+                context = scannerManager.getScanner(scannerId);
+                if (context == null) {
+                    String msg =
+                            scannerManager.isRecentlyExpired(scannerId)
+                                    ? "Scanner session has expired due to 
inactivity. "
+                                            + "Please start a new scan."
+                                    : "Unknown scanner ID. The session may 
have expired or "
+                                            + "never existed.";
+                    throw new UnknownScannerIdException(msg);
+                }
+                // Validate call-sequence ordering to detect duplicate or 
out-of-order requests.
+                // getScanner() already refreshed the last-access timestamp.
+                if (request.hasCallSeqId()) {
+                    int expectedSeqId = context.getCallSeqId() + 1;
+                    int requestSeqId = request.getCallSeqId();
+                    if (requestSeqId != expectedSeqId) {
+                        throw new InvalidScanRequestException(
+                                String.format(
+                                        "Out-of-order scan request: expected 
callSeqId=%d but got %d.",
+                                        expectedSeqId, requestSeqId));
+                    }
+                }
+            }
+
+            // Handle explicit close request
+            if (request.hasCloseScanner() && request.isCloseScanner()) {
+                scannerManager.removeScanner(context);
+                response.setScannerId(context.getScannerId());
+                response.setHasMoreResults(false);
+                return CompletableFuture.completedFuture(response);
+            }
+
+            // Build the next batch
+            int batchSizeBytes = request.getBatchSizeBytes();

Review Comment:
   The handler does not enforce the documented mutual exclusivity of 
`bucket_scan_req` vs `scanner_id`. If both are set, it silently treats the 
request as a new scan and ignores `scanner_id`, which can create duplicate 
sessions. Also, `batch_size_bytes` is read unconditionally after the close 
logic is evaluated but only after a `ScannerContext` is obtained; combined with 
`batch_size_bytes` potentially being 0 (or missing), the scan can become 
non-progressing (empty batches with `has_more_results=true` and no cursor 
advance). Recommended: (1) reject requests that set both fields; (2) validate 
`batch_size_bytes > 0` for non-close requests (and/or guarantee at least one 
record per response); (3) allow `close_scanner` requests to succeed even if 
`batch_size_bytes` is absent by structuring validation accordingly.
   ```suggestion
               int batchSizeBytes = request.getBatchSizeBytes();
               if (batchSizeBytes <= 0) {
                   throw new InvalidScanRequestException(
                           "ScanKvRequest must specify batch_size_bytes > 0 for 
non-close requests.");
               }
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.DefaultValueRecordBatch;
+import org.apache.fluss.record.ValueRecord;
+import org.apache.fluss.record.ValueRecordReadContext;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.PbScanReqForBucket;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.SchemaUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link BatchScanner} that streams all live rows from a single KV bucket 
by iterating the tablet
+ * server's RocksDB instance via a sequence of ScanKv RPCs.
+ *
+ * <p>The remote scanner is opened lazily on the first {@link #pollBatch} 
call. Once the bucket is
+ * exhausted the scanner closes itself on the server. If the caller needs to 
abort early it must
+ * call {@link #close} explicitly.
+ *
+ * <p>After each response with {@code has_more_results = true} the next RPC is 
fired immediately,
+ * overlapping network latency with the caller's row processing. At most one 
request is in-flight at
+ * any time.
+ *
+ * <p>Not reusable and not thread-safe.
+ */
+public class KvBatchScanner implements BatchScanner {
+    private static final int BATCH_SIZE_BYTES = 4 * 1024 * 1024;
+
+    private final TablePath tablePath;
+    private final TableBucket tableBucket;
+    private final SchemaGetter schemaGetter;
+    private final MetadataUpdater metadataUpdater;
+    private final int targetSchemaId;
+
+    /** Cache for schema evolution index mappings. */
+    private final Map<Short, int[]> schemaMappingCache = new HashMap<>();
+
+    /** Reused across all batches; schemaGetter and kvFormat never change. */
+    private final ValueRecordReadContext readContext;
+
+    private boolean done = false;
+    @Nullable private TabletServerGateway gateway;
+    @Nullable private byte[] scannerId;
+
+    /** Monotonically increasing ID sent with each continuation request, 
starting at 0. */
+    private int callSeqId = 0;
+
+    @Nullable private CompletableFuture<ScanKvResponse> prefetchFuture;
+
+    public KvBatchScanner(
+            TableInfo tableInfo,
+            TableBucket tableBucket,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater) {
+        this.tablePath = tableInfo.getTablePath();
+        this.tableBucket = tableBucket;
+        this.schemaGetter = schemaGetter;
+        this.metadataUpdater = metadataUpdater;
+        this.targetSchemaId = tableInfo.getSchemaId();
+        this.readContext =
+                ValueRecordReadContext.createReadContext(
+                        schemaGetter, 
tableInfo.getTableConfig().getKvFormat());
+    }
+
+    /**
+     * Returns the next batch of rows.
+     *
+     * <ul>
+     *   <li>Returns an empty iterator if the in-flight RPC has not completed 
within {@code
+     *       timeout}.
+     *   <li>Returns {@code null} when the bucket is exhausted or the scanner 
has been closed.
+     * </ul>
+     */
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (done) {
+            return null;
+        }
+        if (prefetchFuture == null) {
+            try {
+                openScanner();
+            } catch (Exception e) {
+                done = true;
+                // TODO: handle LeaderNotAvailableException with retry (see 
LimitBatchScanner).
+                throw new IOException("Failed to open scanner for bucket " + 
tableBucket, e);
+            }
+        }
+
+        ScanKvResponse response;
+        try {
+            response = prefetchFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            return CloseableIterator.emptyIterator();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        prefetchFuture = null;
+

Review Comment:
   Two client-side issues: (1) `close()` sends a `ScanKvRequest` without 
setting `batch_size_bytes`, but the proto marks it as `required`, and the 
server reads it; this can make close requests invalid or default to 0. (2) 
`pollBatch()` never checks `error_code/error_message` and will treat error 
responses as successful batches; it should translate non-zero error codes into 
an exception (and potentially trigger metadata refresh / retry for retriable 
errors).
   ```suggestion
   
           // Check for server-side errors before interpreting the response as 
a valid batch.
           if (response.hasErrorCode() && response.getErrorCode() != 0) {
               done = true;
               String message =
                       response.hasErrorMessage()
                               ? response.getErrorMessage()
                               : "unknown error";
               throw new IOException(
                       "ScanKvRequest for bucket "
                               + tableBucket
                               + " failed with error_code="
                               + response.getErrorCode()
                               + ", error_message="
                               + message);
           }
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableKvScan.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.client.table.scanner.batch.KvBatchScanner;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link KvScan} that discovers all buckets across all 
partitions and chains
+ * {@link KvBatchScanner} instances to produce a unified lazy iterator.
+ *
+ * <p>Bucket list is built eagerly at {@link #execute()} time; for partitioned 
tables partition
+ * metadata is refreshed from the server before building the list. Individual 
{@link KvBatchScanner}
+ * instances are created lazily one at a time as each bucket is consumed.
+ */
+public class TableKvScan implements KvScan {
+
+    /**
+     * Short poll timeout used inside the iterator's {@code hasNext()} loop. 
This keeps the busy
+     * loop responsive while still allowing the prefetch to arrive.
+     *
+     * <p>TODO: make this configurable via a client config option.
+     */
+    private static final Duration POLL_TIMEOUT = Duration.ofMillis(10);
+
+    private final TableInfo tableInfo;
+    private final SchemaGetter schemaGetter;
+    private final MetadataUpdater metadataUpdater;
+
+    public TableKvScan(
+            TableInfo tableInfo, SchemaGetter schemaGetter, MetadataUpdater 
metadataUpdater) {
+        this.tableInfo = tableInfo;
+        this.schemaGetter = schemaGetter;
+        this.metadataUpdater = metadataUpdater;
+    }
+
+    @Override
+    public CloseableIterator<InternalRow> execute() {
+        return new AllBucketsIterator(discoverBuckets());
+    }
+
+    /**
+     * Returns the full ordered list of {@link TableBucket}s to scan.
+     *
+     * <ul>
+     *   <li>Non-partitioned tables: buckets 0 … numBuckets-1, no partitionId.
+     *   <li>Partitioned tables: metadata refreshed from the server; buckets 0 
… numBuckets-1 for
+     *       each discovered partition.
+     * </ul>
+     */
+    private List<TableBucket> discoverBuckets() {
+        TablePath tablePath = tableInfo.getTablePath();
+        long tableId = tableInfo.getTableId();
+        int numBuckets = tableInfo.getNumBuckets();
+        List<TableBucket> buckets = new ArrayList<>();
+
+        if (!tableInfo.isPartitioned()) {
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(new TableBucket(tableId, b));
+            }
+        } else {
+            // Ensure partition metadata is current before iterating.
+            metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+            // TODO: getPartitionIdByPath() iterates all partitions for all 
tables in the cluster.
+            //  Replace with a targeted per-table lookup once the API is 
available.
+            Map<PhysicalTablePath, Long> partitionIdByPath =
+                    metadataUpdater.getCluster().getPartitionIdByPath();
+            for (Map.Entry<PhysicalTablePath, Long> entry : 
partitionIdByPath.entrySet()) {
+                if (tablePath.equals(entry.getKey().getTablePath())) {
+                    long partitionId = entry.getValue();
+                    for (int b = 0; b < numBuckets; b++) {
+                        buckets.add(new TableBucket(tableId, partitionId, b));
+                    }
+                }
+            }
+        }
+        return buckets;
+    }
+
+    /** Chains through all per-bucket {@link KvBatchScanner}s as a single lazy 
iterator. */
+    private class AllBucketsIterator implements CloseableIterator<InternalRow> 
{
+
+        private final List<TableBucket> buckets;
+        private int bucketIndex = 0;
+        @Nullable private KvBatchScanner currentScanner = null;
+        @Nullable private CloseableIterator<InternalRow> currentBatch = null;
+        private boolean exhausted = false;
+
+        AllBucketsIterator(List<TableBucket> buckets) {
+            this.buckets = buckets;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (exhausted) {
+                return false;
+            }
+            if (currentBatch != null && currentBatch.hasNext()) {
+                return true;
+            }
+            return fillNextBatch();
+        }

Review Comment:
   `currentBatch` is never closed when it becomes exhausted, and `close()` 
nulls `currentBatch` without closing it. Since `CloseableIterator` can 
encapsulate resources, this can leak per-batch resources across bucket 
transitions and on early termination. Suggested fix: close the previous 
`currentBatch` when advancing to the next batch (when `hasNext()` becomes 
false) and also close it in `close()`.



##########
fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java:
##########
@@ -257,4 +259,9 @@ public CompletableFuture<DescribeClusterConfigsResponse> 
describeClusterConfigs(
             DescribeClusterConfigsRequest request) {
         return null;
     }
+
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
+        return null;

Review Comment:
   Returning `null` here risks NPEs in any tests that call `scanKv` through 
`TestingTabletGatewayService`. Returning a completed/failed `CompletableFuture` 
is safer and aligns with the gateway contract.
   ```suggestion
           CompletableFuture<ScanKvResponse> future = new CompletableFuture<>();
           future.completeExceptionally(
                   new UnsupportedOperationException(
                           "scanKv is not supported in 
TestingTabletGatewayService."));
           return future;
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java:
##########
@@ -422,6 +433,114 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> 
notifyLakeTableOffset(
         return response;
     }
 
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {

Review Comment:
   `ScanKvResponse` includes `log_offset` (‘Only set on the first response’) in 
the proto, but the server never sets it. Either populate it on scan initiation 
(and plumb the value through `KvTablet.openScan` / `ScannerContext`) or 
remove/adjust the field/comment to avoid an API contract that is never honored.



##########
fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java:
##########
@@ -431,4 +433,9 @@ private StopReplicaResponse mockStopReplicaResponse(
         
stopReplicaResponse.addAllStopReplicasResps(protoStopReplicaRespForBuckets);
         return stopReplicaResponse;
     }
+
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
+        return null;

Review Comment:
   Returning `null` for a method declared to return `CompletableFuture<...>` 
can cause NPEs if this test gateway is exercised by any scan-related tests. 
Prefer returning `CompletableFuture.completedFuture(...)` (or a failed future) 
to make failures deterministic and easier to diagnose.
   ```suggestion
           CompletableFuture<ScanKvResponse> future = new CompletableFuture<>();
           future.completeExceptionally(
                   new UnsupportedOperationException("scanKv is not supported 
in TestTabletServerGateway"));
           return future;
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.DefaultValueRecordBatch;
+import org.apache.fluss.record.ValueRecord;
+import org.apache.fluss.record.ValueRecordReadContext;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.PbScanReqForBucket;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.SchemaUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link BatchScanner} that streams all live rows from a single KV bucket 
by iterating the tablet
+ * server's RocksDB instance via a sequence of ScanKv RPCs.
+ *
+ * <p>The remote scanner is opened lazily on the first {@link #pollBatch} 
call. Once the bucket is
+ * exhausted the scanner closes itself on the server. If the caller needs to 
abort early it must
+ * call {@link #close} explicitly.
+ *
+ * <p>After each response with {@code has_more_results = true} the next RPC is 
fired immediately,
+ * overlapping network latency with the caller's row processing. At most one 
request is in-flight at
+ * any time.
+ *
+ * <p>Not reusable and not thread-safe.
+ */
+public class KvBatchScanner implements BatchScanner {
+    private static final int BATCH_SIZE_BYTES = 4 * 1024 * 1024;

Review Comment:
   The batch size is hard-coded to 4MB even though a new config option 
`client.scanner.kv.fetch.max-bytes` was added. This makes the config 
ineffective for KV scans. Consider sourcing the batch size from client/table 
configuration and using that value here (with a sane minimum to guarantee 
forward progress).



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.DefaultValueRecordBatch;
+import org.apache.fluss.record.ValueRecord;
+import org.apache.fluss.record.ValueRecordReadContext;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.PbScanReqForBucket;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.SchemaUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link BatchScanner} that streams all live rows from a single KV bucket 
by iterating the tablet
+ * server's RocksDB instance via a sequence of ScanKv RPCs.
+ *
+ * <p>The remote scanner is opened lazily on the first {@link #pollBatch} 
call. Once the bucket is
+ * exhausted the scanner closes itself on the server. If the caller needs to 
abort early it must
+ * call {@link #close} explicitly.
+ *
+ * <p>After each response with {@code has_more_results = true} the next RPC is 
fired immediately,
+ * overlapping network latency with the caller's row processing. At most one 
request is in-flight at
+ * any time.
+ *
+ * <p>Not reusable and not thread-safe.
+ */
+public class KvBatchScanner implements BatchScanner {
+    private static final int BATCH_SIZE_BYTES = 4 * 1024 * 1024;
+
+    private final TablePath tablePath;
+    private final TableBucket tableBucket;
+    private final SchemaGetter schemaGetter;
+    private final MetadataUpdater metadataUpdater;
+    private final int targetSchemaId;
+
+    /** Cache for schema evolution index mappings. */
+    private final Map<Short, int[]> schemaMappingCache = new HashMap<>();
+
+    /** Reused across all batches; schemaGetter and kvFormat never change. */
+    private final ValueRecordReadContext readContext;
+
+    private boolean done = false;
+    @Nullable private TabletServerGateway gateway;
+    @Nullable private byte[] scannerId;
+
+    /** Monotonically increasing ID sent with each continuation request, 
starting at 0. */
+    private int callSeqId = 0;
+
+    @Nullable private CompletableFuture<ScanKvResponse> prefetchFuture;
+
+    public KvBatchScanner(
+            TableInfo tableInfo,
+            TableBucket tableBucket,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater) {
+        this.tablePath = tableInfo.getTablePath();
+        this.tableBucket = tableBucket;
+        this.schemaGetter = schemaGetter;
+        this.metadataUpdater = metadataUpdater;
+        this.targetSchemaId = tableInfo.getSchemaId();
+        this.readContext =
+                ValueRecordReadContext.createReadContext(
+                        schemaGetter, 
tableInfo.getTableConfig().getKvFormat());
+    }
+
+    /**
+     * Returns the next batch of rows.
+     *
+     * <ul>
+     *   <li>Returns an empty iterator if the in-flight RPC has not completed 
within {@code
+     *       timeout}.
+     *   <li>Returns {@code null} when the bucket is exhausted or the scanner 
has been closed.
+     * </ul>
+     */
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (done) {
+            return null;
+        }
+        if (prefetchFuture == null) {
+            try {
+                openScanner();
+            } catch (Exception e) {
+                done = true;
+                // TODO: handle LeaderNotAvailableException with retry (see 
LimitBatchScanner).
+                throw new IOException("Failed to open scanner for bucket " + 
tableBucket, e);
+            }
+        }
+
+        ScanKvResponse response;
+        try {
+            response = prefetchFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            return CloseableIterator.emptyIterator();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        prefetchFuture = null;
+
+        if (response.hasScannerId()) {
+            scannerId = response.getScannerId();
+        }
+
+        boolean hasMore = response.hasHasMoreResults() && 
response.isHasMoreResults();
+        if (hasMore) {
+            sendContinuation();
+        } else {
+            done = true;
+        }
+
+        if (!response.hasRecords()) {
+            // Empty last batch or empty bucket.
+            return done ? null : CloseableIterator.emptyIterator();
+        }
+        List<InternalRow> rows = parseRecords(response);
+        return CloseableIterator.wrap(rows.iterator());
+    }
+
+    /**
+     * Releases all resources. Cancels any in-flight prefetch and sends a 
{@code close_scanner=true}
+     * RPC to free the server-side session immediately. Idempotent.
+     */
+    @Override
+    public void close() throws IOException {
+        if (done) {
+            return;
+        }
+        done = true;
+        if (prefetchFuture != null) {
+            prefetchFuture.cancel(true);
+            prefetchFuture = null;
+        }
+        if (scannerId != null && gateway != null) {
+            // Fire-and-forget: the server will close the session on receipt.
+            gateway.scanKv(new 
ScanKvRequest().setScannerId(scannerId).setCloseScanner(true));

Review Comment:
   Two client-side issues: (1) `close()` sends a `ScanKvRequest` without 
setting `batch_size_bytes`, but the proto marks it as `required`, and the 
server reads it; this can make close requests invalid or default to 0. (2) 
`pollBatch()` never checks `error_code/error_message` and will treat error 
responses as successful batches; it should translate non-zero error codes into 
an exception (and potentially trigger metadata refresh / retry for retriable 
errors).
   ```suggestion
               gateway.scanKv(
                       new ScanKvRequest()
                               .setScannerId(scannerId)
                               .setCloseScanner(true)
                               .setBatchSizeBytes(BATCH_SIZE_BYTES));
   ```



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.DefaultValueRecordBatch;
+import org.apache.fluss.record.ValueRecord;
+import org.apache.fluss.record.ValueRecordReadContext;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.PbScanReqForBucket;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.SchemaUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link BatchScanner} that streams all live rows from a single KV bucket 
by iterating the tablet
+ * server's RocksDB instance via a sequence of ScanKv RPCs.
+ *
+ * <p>The remote scanner is opened lazily on the first {@link #pollBatch} 
call. Once the bucket is
+ * exhausted the scanner closes itself on the server. If the caller needs to 
abort early it must
+ * call {@link #close} explicitly.
+ *
+ * <p>After each response with {@code has_more_results = true} the next RPC is 
fired immediately,
+ * overlapping network latency with the caller's row processing. At most one 
request is in-flight at
+ * any time.
+ *
+ * <p>Not reusable and not thread-safe.
+ */
+public class KvBatchScanner implements BatchScanner {
+    private static final int BATCH_SIZE_BYTES = 4 * 1024 * 1024;
+
+    private final TablePath tablePath;
+    private final TableBucket tableBucket;
+    private final SchemaGetter schemaGetter;
+    private final MetadataUpdater metadataUpdater;
+    private final int targetSchemaId;
+
+    /** Cache for schema evolution index mappings. */
+    private final Map<Short, int[]> schemaMappingCache = new HashMap<>();
+
+    /** Reused across all batches; schemaGetter and kvFormat never change. */
+    private final ValueRecordReadContext readContext;
+
+    private boolean done = false;
+    @Nullable private TabletServerGateway gateway;
+    @Nullable private byte[] scannerId;
+
+    /** Monotonically increasing ID sent with each continuation request, 
starting at 0. */
+    private int callSeqId = 0;
+
+    @Nullable private CompletableFuture<ScanKvResponse> prefetchFuture;
+
+    public KvBatchScanner(
+            TableInfo tableInfo,
+            TableBucket tableBucket,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater) {
+        this.tablePath = tableInfo.getTablePath();
+        this.tableBucket = tableBucket;
+        this.schemaGetter = schemaGetter;
+        this.metadataUpdater = metadataUpdater;
+        this.targetSchemaId = tableInfo.getSchemaId();
+        this.readContext =
+                ValueRecordReadContext.createReadContext(
+                        schemaGetter, 
tableInfo.getTableConfig().getKvFormat());
+    }
+
+    /**
+     * Returns the next batch of rows.
+     *
+     * <ul>
+     *   <li>Returns an empty iterator if the in-flight RPC has not completed 
within {@code
+     *       timeout}.
+     *   <li>Returns {@code null} when the bucket is exhausted or the scanner 
has been closed.
+     * </ul>
+     */
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (done) {
+            return null;
+        }
+        if (prefetchFuture == null) {
+            try {
+                openScanner();
+            } catch (Exception e) {
+                done = true;
+                // TODO: handle LeaderNotAvailableException with retry (see 
LimitBatchScanner).
+                throw new IOException("Failed to open scanner for bucket " + 
tableBucket, e);
+            }
+        }
+
+        ScanKvResponse response;
+        try {
+            response = prefetchFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            return CloseableIterator.emptyIterator();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        prefetchFuture = null;
+
+        if (response.hasScannerId()) {
+            scannerId = response.getScannerId();
+        }
+
+        boolean hasMore = response.hasHasMoreResults() && 
response.isHasMoreResults();
+        if (hasMore) {
+            sendContinuation();
+        } else {
+            done = true;
+        }
+
+        if (!response.hasRecords()) {
+            // Empty last batch or empty bucket.
+            return done ? null : CloseableIterator.emptyIterator();
+        }
+        List<InternalRow> rows = parseRecords(response);
+        return CloseableIterator.wrap(rows.iterator());
+    }
+
+    /**
+     * Releases all resources. Cancels any in-flight prefetch and sends a 
{@code close_scanner=true}
+     * RPC to free the server-side session immediately. Idempotent.
+     */
+    @Override
+    public void close() throws IOException {
+        if (done) {
+            return;
+        }
+        done = true;
+        if (prefetchFuture != null) {
+            prefetchFuture.cancel(true);
+            prefetchFuture = null;
+        }
+        if (scannerId != null && gateway != null) {
+            // Fire-and-forget: the server will close the session on receipt.
+            gateway.scanKv(new 
ScanKvRequest().setScannerId(scannerId).setCloseScanner(true));
+        }
+    }
+
+    private void openScanner() {
+        if (tableBucket.getPartitionId() != null) {
+            metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
+        }
+        int leader = metadataUpdater.leaderFor(tablePath, tableBucket);
+        gateway = metadataUpdater.newTabletServerClientForNode(leader);
+        if (gateway == null) {
+            throw new LeaderNotAvailableException(
+                    "Server " + leader + " is not found in metadata cache.");
+        }
+
+        PbScanReqForBucket bucketReq =
+                new PbScanReqForBucket()
+                        .setTableId(tableBucket.getTableId())
+                        .setBucketId(tableBucket.getBucket());
+        if (tableBucket.getPartitionId() != null) {
+            bucketReq.setPartitionId(tableBucket.getPartitionId());
+        }
+
+        ScanKvRequest request =
+                new 
ScanKvRequest().setBucketScanReq(bucketReq).setBatchSizeBytes(BATCH_SIZE_BYTES);
+        prefetchFuture = gateway.scanKv(request);
+    }
+
+    private void sendContinuation() {
+        ScanKvRequest request =
+                new ScanKvRequest()
+                        .setScannerId(scannerId)
+                        .setBatchSizeBytes(BATCH_SIZE_BYTES)

Review Comment:
   The batch size is hard-coded to 4MB even though a new config option 
`client.scanner.kv.fetch.max-bytes` was added. This makes the config 
ineffective for KV scans. Consider sourcing the batch size from client/table 
configuration and using that value here (with a sane minimum to guarantee 
forward progress).



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.DefaultValueRecordBatch;
+import org.apache.fluss.record.ValueRecord;
+import org.apache.fluss.record.ValueRecordReadContext;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.PbScanReqForBucket;
+import org.apache.fluss.rpc.messages.ScanKvRequest;
+import org.apache.fluss.rpc.messages.ScanKvResponse;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.SchemaUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A {@link BatchScanner} that streams all live rows from a single KV bucket 
by iterating the tablet
+ * server's RocksDB instance via a sequence of ScanKv RPCs.
+ *
+ * <p>The remote scanner is opened lazily on the first {@link #pollBatch} 
call. Once the bucket is
+ * exhausted the scanner closes itself on the server. If the caller needs to 
abort early it must
+ * call {@link #close} explicitly.
+ *
+ * <p>After each response with {@code has_more_results = true} the next RPC is 
fired immediately,
+ * overlapping network latency with the caller's row processing. At most one 
request is in-flight at
+ * any time.
+ *
+ * <p>Not reusable and not thread-safe.
+ */
+public class KvBatchScanner implements BatchScanner {
+    private static final int BATCH_SIZE_BYTES = 4 * 1024 * 1024;
+
+    private final TablePath tablePath;
+    private final TableBucket tableBucket;
+    private final SchemaGetter schemaGetter;
+    private final MetadataUpdater metadataUpdater;
+    private final int targetSchemaId;
+
+    /** Cache for schema evolution index mappings. */
+    private final Map<Short, int[]> schemaMappingCache = new HashMap<>();
+
+    /** Reused across all batches; schemaGetter and kvFormat never change. */
+    private final ValueRecordReadContext readContext;
+
+    private boolean done = false;
+    @Nullable private TabletServerGateway gateway;
+    @Nullable private byte[] scannerId;
+
+    /** Monotonically increasing ID sent with each continuation request, 
starting at 0. */
+    private int callSeqId = 0;
+
+    @Nullable private CompletableFuture<ScanKvResponse> prefetchFuture;
+
+    public KvBatchScanner(
+            TableInfo tableInfo,
+            TableBucket tableBucket,
+            SchemaGetter schemaGetter,
+            MetadataUpdater metadataUpdater) {
+        this.tablePath = tableInfo.getTablePath();
+        this.tableBucket = tableBucket;
+        this.schemaGetter = schemaGetter;
+        this.metadataUpdater = metadataUpdater;
+        this.targetSchemaId = tableInfo.getSchemaId();
+        this.readContext =
+                ValueRecordReadContext.createReadContext(
+                        schemaGetter, 
tableInfo.getTableConfig().getKvFormat());
+    }
+
+    /**
+     * Returns the next batch of rows.
+     *
+     * <ul>
+     *   <li>Returns an empty iterator if the in-flight RPC has not completed 
within {@code
+     *       timeout}.
+     *   <li>Returns {@code null} when the bucket is exhausted or the scanner 
has been closed.
+     * </ul>
+     */
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (done) {
+            return null;
+        }
+        if (prefetchFuture == null) {
+            try {
+                openScanner();
+            } catch (Exception e) {
+                done = true;
+                // TODO: handle LeaderNotAvailableException with retry (see 
LimitBatchScanner).
+                throw new IOException("Failed to open scanner for bucket " + 
tableBucket, e);
+            }
+        }
+
+        ScanKvResponse response;
+        try {
+            response = prefetchFuture.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            return CloseableIterator.emptyIterator();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        prefetchFuture = null;
+
+        if (response.hasScannerId()) {
+            scannerId = response.getScannerId();
+        }
+
+        boolean hasMore = response.hasHasMoreResults() && 
response.isHasMoreResults();
+        if (hasMore) {
+            sendContinuation();
+        } else {
+            done = true;
+        }
+
+        if (!response.hasRecords()) {
+            // Empty last batch or empty bucket.
+            return done ? null : CloseableIterator.emptyIterator();
+        }
+        List<InternalRow> rows = parseRecords(response);
+        return CloseableIterator.wrap(rows.iterator());
+    }
+
+    /**
+     * Releases all resources. Cancels any in-flight prefetch and sends a 
{@code close_scanner=true}
+     * RPC to free the server-side session immediately. Idempotent.
+     */
+    @Override
+    public void close() throws IOException {
+        if (done) {
+            return;
+        }
+        done = true;
+        if (prefetchFuture != null) {
+            prefetchFuture.cancel(true);
+            prefetchFuture = null;
+        }
+        if (scannerId != null && gateway != null) {
+            // Fire-and-forget: the server will close the session on receipt.
+            gateway.scanKv(new 
ScanKvRequest().setScannerId(scannerId).setCloseScanner(true));
+        }
+    }
+
+    private void openScanner() {
+        if (tableBucket.getPartitionId() != null) {
+            metadataUpdater.checkAndUpdateMetadata(tablePath, tableBucket);
+        }
+        int leader = metadataUpdater.leaderFor(tablePath, tableBucket);
+        gateway = metadataUpdater.newTabletServerClientForNode(leader);
+        if (gateway == null) {
+            throw new LeaderNotAvailableException(
+                    "Server " + leader + " is not found in metadata cache.");
+        }
+
+        PbScanReqForBucket bucketReq =
+                new PbScanReqForBucket()
+                        .setTableId(tableBucket.getTableId())
+                        .setBucketId(tableBucket.getBucket());
+        if (tableBucket.getPartitionId() != null) {
+            bucketReq.setPartitionId(tableBucket.getPartitionId());
+        }
+
+        ScanKvRequest request =
+                new 
ScanKvRequest().setBucketScanReq(bucketReq).setBatchSizeBytes(BATCH_SIZE_BYTES);

Review Comment:
   The batch size is hard-coded to 4MB even though a new config option 
`client.scanner.kv.fetch.max-bytes` was added. This makes the config 
ineffective for KV scans. Consider sourcing the batch size from client/table 
configuration and using that value here (with a sane minimum to guarantee 
forward progress).



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableKvScan.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.client.table.scanner.batch.KvBatchScanner;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link KvScan} that discovers all buckets across all 
partitions and chains
+ * {@link KvBatchScanner} instances to produce a unified lazy iterator.
+ *
+ * <p>Bucket list is built eagerly at {@link #execute()} time; for partitioned 
tables partition
+ * metadata is refreshed from the server before building the list. Individual 
{@link KvBatchScanner}
+ * instances are created lazily one at a time as each bucket is consumed.
+ */
+public class TableKvScan implements KvScan {
+
+    /**
+     * Short poll timeout used inside the iterator's {@code hasNext()} loop. 
This keeps the busy
+     * loop responsive while still allowing the prefetch to arrive.
+     *
+     * <p>TODO: make this configurable via a client config option.
+     */
+    private static final Duration POLL_TIMEOUT = Duration.ofMillis(10);
+
+    private final TableInfo tableInfo;
+    private final SchemaGetter schemaGetter;
+    private final MetadataUpdater metadataUpdater;
+
+    public TableKvScan(
+            TableInfo tableInfo, SchemaGetter schemaGetter, MetadataUpdater 
metadataUpdater) {
+        this.tableInfo = tableInfo;
+        this.schemaGetter = schemaGetter;
+        this.metadataUpdater = metadataUpdater;
+    }
+
+    @Override
+    public CloseableIterator<InternalRow> execute() {
+        return new AllBucketsIterator(discoverBuckets());
+    }
+
+    /**
+     * Returns the full ordered list of {@link TableBucket}s to scan.
+     *
+     * <ul>
+     *   <li>Non-partitioned tables: buckets 0 … numBuckets-1, no partitionId.
+     *   <li>Partitioned tables: metadata refreshed from the server; buckets 0 
… numBuckets-1 for
+     *       each discovered partition.
+     * </ul>
+     */
+    private List<TableBucket> discoverBuckets() {
+        TablePath tablePath = tableInfo.getTablePath();
+        long tableId = tableInfo.getTableId();
+        int numBuckets = tableInfo.getNumBuckets();
+        List<TableBucket> buckets = new ArrayList<>();
+
+        if (!tableInfo.isPartitioned()) {
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(new TableBucket(tableId, b));
+            }
+        } else {
+            // Ensure partition metadata is current before iterating.
+            metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+            // TODO: getPartitionIdByPath() iterates all partitions for all 
tables in the cluster.
+            //  Replace with a targeted per-table lookup once the API is 
available.
+            Map<PhysicalTablePath, Long> partitionIdByPath =
+                    metadataUpdater.getCluster().getPartitionIdByPath();
+            for (Map.Entry<PhysicalTablePath, Long> entry : 
partitionIdByPath.entrySet()) {
+                if (tablePath.equals(entry.getKey().getTablePath())) {
+                    long partitionId = entry.getValue();
+                    for (int b = 0; b < numBuckets; b++) {
+                        buckets.add(new TableBucket(tableId, partitionId, b));
+                    }
+                }
+            }
+        }
+        return buckets;
+    }
+
+    /** Chains through all per-bucket {@link KvBatchScanner}s as a single lazy 
iterator. */
+    private class AllBucketsIterator implements CloseableIterator<InternalRow> 
{
+
+        private final List<TableBucket> buckets;
+        private int bucketIndex = 0;
+        @Nullable private KvBatchScanner currentScanner = null;
+        @Nullable private CloseableIterator<InternalRow> currentBatch = null;
+        private boolean exhausted = false;
+
+        AllBucketsIterator(List<TableBucket> buckets) {
+            this.buckets = buckets;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (exhausted) {
+                return false;
+            }
+            if (currentBatch != null && currentBatch.hasNext()) {
+                return true;
+            }
+            return fillNextBatch();
+        }
+
+        /**
+         * Advances {@link #currentBatch} to the next non-empty batch, opening 
the next scanner when
+         * the current one is exhausted. Returns {@code true} once a row is 
available.
+         */
+        private boolean fillNextBatch() {
+            while (true) {
+                if (currentScanner == null) {
+                    if (bucketIndex >= buckets.size()) {
+                        exhausted = true;
+                        return false;
+                    }
+                    currentScanner =
+                            new KvBatchScanner(
+                                    tableInfo,
+                                    buckets.get(bucketIndex++),
+                                    schemaGetter,
+                                    metadataUpdater);
+                }
+                try {
+                    CloseableIterator<InternalRow> batch = 
currentScanner.pollBatch(POLL_TIMEOUT);
+                    if (batch == null) {
+                        // Bucket exhausted; server session already cleaned up.
+                        currentScanner = null;
+                        continue;
+                    }
+                    if (batch.hasNext()) {
+                        currentBatch = batch;
+                        return true;
+                    }
+                    // Empty batch (RPC not yet ready) — close and spin.
+                    batch.close();
+                } catch (IOException e) {
+                    throw new FlussRuntimeException("Failed to poll KV scan 
batch", e);
+                }
+            }
+        }
+
+        @Override
+        public InternalRow next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return currentBatch.next();
+        }
+
+        @Override
+        public void close() {
+            exhausted = true;
+            currentBatch = null;
+            if (currentScanner != null) {
+                try {
+                    currentScanner.close();
+                } catch (IOException ignored) {
+                }
+                currentScanner = null;
+            }
+        }

Review Comment:
   `currentBatch` is never closed when it becomes exhausted, and `close()` 
nulls `currentBatch` without closing it. Since `CloseableIterator` can 
encapsulate resources, this can leak per-batch resources across bucket 
transitions and on early termination. Suggested fix: close the previous 
`currentBatch` when advancing to the next batch (when `hasNext()` becomes 
false) and also close it in `close()`.



##########
fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java:
##########
@@ -422,6 +433,114 @@ public CompletableFuture<NotifyLakeTableOffsetResponse> 
notifyLakeTableOffset(
         return response;
     }
 
+    @Override
+    public CompletableFuture<ScanKvResponse> scanKv(ScanKvRequest request) {
+        ScanKvResponse response = new ScanKvResponse();
+        try {
+            ScannerContext context;
+
+            if (request.hasBucketScanReq()) {
+                // New scan: open a fresh scanner session
+                PbScanReqForBucket bucketReq = request.getBucketScanReq();
+                long tableId = bucketReq.getTableId();
+                authorizeTable(READ, tableId);
+
+                TableBucket tableBucket =
+                        new TableBucket(
+                                tableId,
+                                bucketReq.hasPartitionId() ? 
bucketReq.getPartitionId() : null,
+                                bucketReq.getBucketId());
+                Long limit = bucketReq.hasLimit() ? bucketReq.getLimit() : 
null;
+
+                context =
+                        scannerManager.createScanner(
+                                replicaManager.getLeaderKvTablet(tableBucket), 
tableBucket, limit);
+
+                if (context == null) {
+                    // Bucket is empty — return an empty response immediately 
without registering a
+                    // session.
+                    response.setHasMoreResults(false);
+                    return CompletableFuture.completedFuture(response);
+                }
+            } else {
+                if (!request.hasScannerId()) {
+                    throw new InvalidScanRequestException(
+                            "ScanKvRequest must have either bucket_scan_req 
(new scan) "
+                                    + "or scanner_id (continuation).");
+                }
+                byte[] scannerId = request.getScannerId();
+                context = scannerManager.getScanner(scannerId);
+                if (context == null) {
+                    String msg =
+                            scannerManager.isRecentlyExpired(scannerId)
+                                    ? "Scanner session has expired due to 
inactivity. "
+                                            + "Please start a new scan."
+                                    : "Unknown scanner ID. The session may 
have expired or "
+                                            + "never existed.";
+                    throw new UnknownScannerIdException(msg);
+                }
+                // Validate call-sequence ordering to detect duplicate or 
out-of-order requests.
+                // getScanner() already refreshed the last-access timestamp.
+                if (request.hasCallSeqId()) {
+                    int expectedSeqId = context.getCallSeqId() + 1;
+                    int requestSeqId = request.getCallSeqId();
+                    if (requestSeqId != expectedSeqId) {
+                        throw new InvalidScanRequestException(
+                                String.format(
+                                        "Out-of-order scan request: expected 
callSeqId=%d but got %d.",
+                                        expectedSeqId, requestSeqId));
+                    }
+                }
+            }
+
+            // Handle explicit close request
+            if (request.hasCloseScanner() && request.isCloseScanner()) {
+                scannerManager.removeScanner(context);
+                response.setScannerId(context.getScannerId());
+                response.setHasMoreResults(false);
+                return CompletableFuture.completedFuture(response);
+            }
+
+            // Build the next batch
+            int batchSizeBytes = request.getBatchSizeBytes();
+            DefaultValueRecordBatch.Builder builder = 
DefaultValueRecordBatch.builder();
+            int totalBytes = 0;
+
+            while (context.isValid() && totalBytes < batchSizeBytes) {
+                byte[] value = context.currentValue();
+                builder.append(value);
+                totalBytes += value.length;
+                context.advance();
+            }
+
+            boolean hasMore = context.isValid();
+            DefaultValueRecordBatch batch = builder.build();
+
+            response.setScannerId(context.getScannerId());
+            response.setHasMoreResults(hasMore);

Review Comment:
   `ScanKvResponse` includes `log_offset` (‘Only set on the first response’) in 
the proto, but the server never sets it. Either populate it on scan initiation 
(and plumb the value through `KvTablet.openScan` / `ScannerContext`) or 
remove/adjust the field/comment to avoid an API contract that is never honored.
   ```suggestion
               response.setHasMoreResults(hasMore);
               // Populate log_offset so that the API contract is honored.
               // Currently we do not track a real log offset here, so we use a 
neutral default.
               response.setLogOffset(0L);
   ```



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java:
##########
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.scan;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.TooManyScannersException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.KvTablet;
+import org.apache.fluss.utils.AutoCloseableAsync;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FutureUtils;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Manages server-side KV full-scan sessions ({@link ScannerContext}).
+ *
+ * <p>Each KV full scan opens a persistent server-side session that holds a 
point-in-time RocksDB
+ * snapshot and a cursor. Sessions are keyed by a server-assigned UUID-based 
scanner ID and persist
+ * across multiple batched-fetch RPCs from the same client.
+ *
+ * <h3>Concurrency limits</h3>
+ *
+ * <ul>
+ *   <li><b>Per-bucket:</b> at most {@code maxPerBucket} concurrent sessions 
on any single bucket.
+ *   <li><b>Per-server:</b> at most {@code maxPerServer} concurrent sessions 
across all buckets.
+ * </ul>
+ *
+ * <p>Limit enforcement is two-phase: a fast pre-check guards the common case; 
the subsequent atomic
+ * increment with re-check and rollback prevents the TOCTOU race from 
permanently breaching the
+ * configured limits. Exceeding either limit causes {@link 
TooManyScannersException}.
+ *
+ * <h3>Empty bucket handling</h3>
+ *
+ * <p>If the target bucket contains no rows at the time the scan is opened, 
{@link
+ * #createScanner(KvTablet, TableBucket, Long)} returns {@code null} without 
consuming a limit slot.
+ * The caller should return an empty response immediately.
+ *
+ * <h3>TTL eviction</h3>
+ *
+ * <p>A background reaper task runs every {@code 
server.scanner.expiration-interval} and evicts
+ * sessions idle longer than {@code server.scanner.ttl}. Recently evicted IDs 
are retained for
+ * {@code 2 × ttl} so callers can distinguish "expired" from "never existed."
+ *
+ * <h3>Leadership change</h3>
+ *
+ * {@link #closeScannersForBucket(TableBucket)} must be called when a bucket 
loses leadership to
+ * release all RocksDB snapshot/iterator resources for that bucket promptly.
+ */
+public class ScannerManager implements AutoCloseableAsync {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ScannerManager.class);
+
+    private final Map<String, ScannerContext> scanners = 
MapUtils.newConcurrentHashMap();
+    private final Map<String, Long> recentlyExpiredIds = 
MapUtils.newConcurrentHashMap();
+
+    /** Per-bucket active scanner count, used for O(1) per-bucket limit 
enforcement. */
+    private final Map<TableBucket, AtomicInteger> perBucketCount = 
MapUtils.newConcurrentHashMap();
+
+    /** Total active scanner count across all buckets on this tablet server. */
+    private final AtomicInteger totalScanners = new AtomicInteger(0);
+
+    private final Clock clock;
+    private final long scannerTtlMs;
+    private final long recentlyExpiredRetentionMs;
+    private final int maxPerBucket;
+    private final int maxPerServer;
+
+    @Nullable private ScheduledFuture<?> cleanupTask;
+
+    public ScannerManager(Configuration conf, Scheduler scheduler) {
+        this(conf, scheduler, SystemClock.getInstance());
+    }
+
+    @VisibleForTesting
+    ScannerManager(Configuration conf, Scheduler scheduler, Clock clock) {
+        this.clock = clock;
+        this.scannerTtlMs = 
conf.get(ConfigOptions.SERVER_SCANNER_TTL).toMillis();
+        this.recentlyExpiredRetentionMs = 2 * scannerTtlMs;
+        this.maxPerBucket = 
conf.get(ConfigOptions.SERVER_SCANNER_MAX_PER_BUCKET);
+        this.maxPerServer = 
conf.get(ConfigOptions.SERVER_SCANNER_MAX_PER_SERVER);
+
+        long expirationIntervalMs =
+                
conf.get(ConfigOptions.SERVER_SCANNER_EXPIRATION_INTERVAL).toMillis();
+        this.cleanupTask =
+                scheduler.schedule(
+                        "scanner-expiration",
+                        this::cleanupExpiredScanners,
+                        expirationIntervalMs,
+                        expirationIntervalMs);
+
+        LOG.info(
+                "Started ScannerManager: ttl={}ms, expirationInterval={}ms, "
+                        + "maxPerBucket={}, maxPerServer={}",
+                scannerTtlMs,
+                expirationIntervalMs,
+                maxPerBucket,
+                maxPerServer);
+    }
+
+    /**
+     * Creates a new scan session for the given bucket, taking a point-in-time 
RocksDB snapshot.
+     *
+     * <p>Returns {@code null} if the bucket is empty (no rows to scan). In 
that case no session
+     * slot is consumed and the caller should return an empty response 
immediately.
+     *
+     * <p><b>Limit enforcement is two-phase:</b> a fast pre-check guards the 
common case; the
+     * subsequent atomic increment + re-check prevents the TOCTOU race from 
permanently breaching
+     * configured limits. If registration fails after the snapshot is already 
opened, the context is
+     * closed and the exception is re-thrown to avoid leaking resources.
+     *
+     * @param kvTablet the {@link KvTablet} for the bucket; used to open the 
snapshot
+     * @param tableBucket the bucket being scanned
+     * @param limit optional row-count limit ({@code null} or ≤ 0 means 
unlimited)
+     * @return the newly registered {@link ScannerContext}, or {@code null} if 
the bucket is empty
+     * @throws TooManyScannersException if the per-bucket or per-server limit 
is exceeded
+     * @throws IOException if the underlying {@link 
org.apache.fluss.server.utils.ResourceGuard} is
+     *     already closed (the bucket is shutting down)
+     */
+    @Nullable
+    public ScannerContext createScanner(
+            KvTablet kvTablet, TableBucket tableBucket, @Nullable Long limit) 
throws IOException {
+        checkLimits(tableBucket);
+
+        String scannerId = generateScannerId();
+        ScannerContext context =
+                kvTablet.openScan(scannerId, limit != null ? limit : -1L, 
clock.milliseconds());
+        if (context == null) {
+            // Bucket is empty — no session slot consumed.
+            return null;
+        }
+
+        try {
+            registerContext(context, tableBucket);
+        } catch (TooManyScannersException e) {
+            // Limit was exceeded between the initial check and registration 
(race window).
+            // Close the already-opened context to avoid leaking the snapshot 
and lease.
+            closeScannerContext(context);
+            throw e;
+        }
+        return context;
+    }
+
+    /**
+     * Looks up an existing scanner session by its raw ID bytes and refreshes 
its last-access
+     * timestamp.
+     *
+     * @return the {@link ScannerContext}, or {@code null} if not found (may 
have expired or never
+     *     existed)
+     */
+    @Nullable
+    public ScannerContext getScanner(byte[] scannerId) {
+        ScannerContext context = scanners.get(new String(scannerId, 
StandardCharsets.UTF_8));
+        if (context != null) {
+            context.updateLastAccessTime(clock.milliseconds());
+        }
+        return context;
+    }
+
+    /**
+     * Returns {@code true} if the given scanner ID belongs to a session that 
was recently evicted
+     * by the TTL reaper (within the last {@code 2 × ttlMs}).
+     *
+     * <p>Callers can use this to distinguish "scanner expired" from "unknown 
scanner ID."
+     */
+    public boolean isRecentlyExpired(byte[] scannerId) {
+        return recentlyExpiredIds.containsKey(new String(scannerId, 
StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Removes and closes a known scanner context directly, avoiding a map 
lookup.
+     *
+     * <p>Uses a conditional remove ({@link 
java.util.concurrent.ConcurrentHashMap#remove(Object,
+     * Object)}) so that concurrent calls — e.g. from the TTL reaper and a 
close-scanner RPC
+     * arriving simultaneously — result in exactly one winner closing the 
context, preventing
+     * double-release of the non-idempotent {@link
+     * org.apache.fluss.server.utils.ResourceGuard.Lease}.
+     */
+    public void removeScanner(ScannerContext context) {
+        if (scanners.remove(context.getId(), context)) {
+            decrementCounts(context.getTableBucket());
+            closeScannerContext(context);
+        }
+    }
+
+    /**
+     * Looks up and removes a scanner session by its raw ID bytes.
+     *
+     * <p>No-op if the ID is not found (already removed or expired).
+     */
+    public void removeScanner(byte[] scannerId) {
+        String key = new String(scannerId, StandardCharsets.UTF_8);
+        ScannerContext context = scanners.remove(key);
+        if (context != null) {
+            decrementCounts(context.getTableBucket());
+            closeScannerContext(context);
+        }
+    }
+
+    /** Returns the total number of active scanner sessions on this tablet 
server. */
+    @VisibleForTesting
+    public int activeScannerCount() {
+        return totalScanners.get();
+    }
+
+    /** Returns the number of active scanner sessions for the given bucket. */
+    @VisibleForTesting
+    public int activeScannerCountForBucket(TableBucket tableBucket) {
+        AtomicInteger count = perBucketCount.get(tableBucket);
+        return count == null ? 0 : count.get();
+    }
+
+    /**
+     * Closes and removes all active scanner sessions for the given bucket. 
Must be called when a
+     * bucket loses leadership to prevent stale RocksDB snapshot/iterator 
leaks.
+     */
+    public void closeScannersForBucket(TableBucket tableBucket) {
+        List<String> toRemove = new ArrayList<>();
+        for (Map.Entry<String, ScannerContext> entry : scanners.entrySet()) {
+            if (tableBucket.equals(entry.getValue().getTableBucket())) {
+                toRemove.add(entry.getKey());
+            }
+        }
+        for (String key : toRemove) {
+            ScannerContext context = scanners.remove(key);
+            if (context != null) {
+                decrementCounts(tableBucket);
+                LOG.info(
+                        "Closing scanner {} for bucket {} due to leadership 
change.",
+                        key,
+                        tableBucket);
+                closeScannerContext(context);
+            }
+        }
+    }
+
+    /**
+     * Fast pre-check of per-server and per-bucket limits before opening the 
snapshot. This is a
+     * best-effort check; a small race window exists between the check and 
{@link #registerContext}.
+     * The race is handled by the atomic re-check inside {@link 
#registerContext}.
+     */
+    private void checkLimits(TableBucket tableBucket) {
+        if (totalScanners.get() >= maxPerServer) {
+            throw new TooManyScannersException(
+                    String.format(
+                            "Cannot create scanner for bucket %s: server-wide 
limit of %d reached.",
+                            tableBucket, maxPerServer));
+        }
+        AtomicInteger bucketCount =
+                perBucketCount.computeIfAbsent(tableBucket, k -> new 
AtomicInteger(0));
+        if (bucketCount.get() >= maxPerBucket) {
+            throw new TooManyScannersException(
+                    String.format(
+                            "Cannot create scanner for bucket %s: per-bucket 
limit of %d reached.",
+                            tableBucket, maxPerBucket));
+        }
+    }
+
+    /**
+     * Atomically increments the counters and puts the context in the map. 
Throws {@link
+     * TooManyScannersException} and rolls back the increments if a concurrent 
create caused either
+     * limit to be exceeded between the initial check and this call.
+     */
+    private void registerContext(ScannerContext context, TableBucket 
tableBucket) {
+        AtomicInteger bucketCount =
+                perBucketCount.computeIfAbsent(tableBucket, k -> new 
AtomicInteger(0));
+
+        int newTotal = totalScanners.incrementAndGet();
+        if (newTotal > maxPerServer) {
+            totalScanners.decrementAndGet();
+            throw new TooManyScannersException(
+                    String.format(
+                            "Cannot create scanner for bucket %s: server-wide 
limit of %d reached.",
+                            tableBucket, maxPerServer));
+        }
+
+        int newBucketCount = bucketCount.incrementAndGet();
+        if (newBucketCount > maxPerBucket) {
+            bucketCount.decrementAndGet();
+            totalScanners.decrementAndGet();
+            throw new TooManyScannersException(
+                    String.format(
+                            "Cannot create scanner for bucket %s: per-bucket 
limit of %d reached.",
+                            tableBucket, maxPerBucket));
+        }
+
+        scanners.put(context.getId(), context);
+
+        LOG.debug(
+                "Registered scanner {} for bucket {} (total={}, perBucket={})",
+                context.getId(),
+                tableBucket,
+                newTotal,
+                newBucketCount);
+    }
+
+    /** TTL reaper — invoked periodically by the background scheduler. */
+    private void cleanupExpiredScanners() {
+        long now = clock.milliseconds();
+
+        // Prune stale entries from the recently-expired cache to bound memory 
usage.
+        recentlyExpiredIds
+                .entrySet()
+                .removeIf(e -> now - e.getValue() > 
recentlyExpiredRetentionMs);
+
+        for (Map.Entry<String, ScannerContext> entry : scanners.entrySet()) {
+            ScannerContext context = entry.getValue();
+            if (context.isExpired(scannerTtlMs, now)) {
+                // Conditional remove prevents double-close if removeScanner() 
fires concurrently.
+                if (scanners.remove(entry.getKey(), context)) {
+                    LOG.info(
+                            "Evicted idle scanner {} for bucket {} (idle > 
{}ms).",
+                            entry.getKey(),
+                            context.getTableBucket(),
+                            scannerTtlMs);
+                    recentlyExpiredIds.put(entry.getKey(), now);
+                    decrementCounts(context.getTableBucket());
+                    closeScannerContext(context);
+                }
+            }
+        }
+    }
+
+    private void decrementCounts(TableBucket bucket) {
+        totalScanners.decrementAndGet();
+        AtomicInteger count = perBucketCount.get(bucket);
+        if (count != null) {
+            count.decrementAndGet();
+        }
+    }
+

Review Comment:
   `perBucketCount` entries are never removed when the count drops to 0. Over 
time (e.g., many partitions/buckets being scanned once), this can grow without 
bound. Consider removing the `AtomicInteger` entry when it reaches 0 (e.g., 
`computeIfPresent` and remove-on-zero) to keep the map bounded.
   ```suggestion
           perBucketCount.computeIfPresent(
                   bucket,
                   (b, count) -> count.decrementAndGet() == 0 ? null : count);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to