Hi Jark, Thank you for your input and for providing more context... I updated the FIP based on your suggestions.
I was debating with the API design, and my new design didn't take into account the old design with the BatchScanner. I will follow the old design since it's more consistent to what we have now. Best, Giannis On Thu, Mar 26, 2026 at 5:50 PM Jark Wu <[email protected]> wrote: > Hi Lorenzo, > > I can help clarify some of the design decisions, as I was involved in > the initial design of this FIP. > > (1) paginated requests with snapshot ID > > The current design of this FIP is exactly a paginated request with a > snapshot ID. When a new scan request is initiated, the server takes a > snapshot, and all subsequent streaming scan requests retrieve > paginated data from that specific snapshot. > > The only distinction lies in the pagination mechanism. Your proposal > uses keys, whereas the FIP utilizes `call_seq_id` (effectively a page > ID). Adopting `call_seq_id` is a simpler and more efficient > implementation. Using keys would be heavier, as they may require > serializing multiple fields, significantly increasing the request > payload size. > > > (2) true streaming RPC > IIUC, your design proposes a push-based model, whereas this FIP adopts > a pull-based model, consistent with the architecture of Fluss and > Kafka (e.g., in `FetchLog`). We chose the pull model for two key > reasons: > > * Flow Control: In a push model, the server dictates the rate, > risking consumer overload and causing backpressure that can exhaust > server threads. The pull model allows clients to fetch data only when > they are ready, naturally preventing congestion. > * Replayability: Handling network failures is simpler with pulling. > Clients can easily re-fetch missing data using a `page_id` or log > offset. In contrast, managing state, offsets, and varying consumer > speeds for thousands of clients in a push model adds significant > complexity, effectively turning the server into a stateful dispatcher. > > Best, > Jark > > > > On Fri, 27 Mar 2026 at 00:28, Jark Wu <[email protected]> wrote: > > > > Hi Giannis, > > > > Thanks for driving this. I only have some questions on the FIP. > > > > (1) Scan API > > The FIP says "Remove the LIMIT requirement from > > TableScan.createBatchScanner(TableBucket) for PK tables" which reuses > > the existing "Table.newScan.createBatchScan(..)" API. But the Public > > Interface section introduce a new API "Table.newKvScan.execute()". > > > > I believe extending the existing "Table.newScan().createBatchScan()" > > to support non-LIMIT queries would provide a more unified approach. > > Since this API already handles KV scan functionality, introducing a > > separate KvScan interface could confuse users regarding which method > > to choose. > > > > (2) The protobuf definition currently uses `uint32` and `uint64` for > > several fields. Since Java lacks native support for unsigned integers, > > this forces us to rely on `Integer.toUnsignedLong()` wrappers > > throughout the codebase, adding unnecessary complexity. Switching to > > `int32` and `int64` would significantly simplify the logic and improve > > code readability. > > > > (3) Configuration. > > I have a minor suggestion regarding the configuration naming. Since > > this scanner is designed exclusively for KV tables, and our existing > > server-side KV configurations consistently use the kv.* prefix, I > > recommend changing server.scanner.* to kv.scanner.*. This adjustment > > will ensure better consistency with our current naming conventions." > > > > Best, > > Jark > > > > On Thu, 26 Mar 2026 at 16:44, Lorenzo Affetti via dev > > <[email protected]> wrote: > > > > > > Ehy Giannis! Sorry for the late reply. > > > > > > *Is the ScannerManager / session state actually required?* > > > > > > *The key thing with this design is snapshot isolation across the entire > > > bucket scan.* > > > *A ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, > and > > > all subsequent batches read from that snapshot.* > > > > > > I understand, however, with the same approach, I see easier ways to > > > implement this. For example, make the client reply not only with a > > > "last_key" but also with a "snapshot_id". > > > The server would need to store the open snapshots (this is the stateful > > > part) to re-use them upon request. > > > You can drop the iterator, but hold a pointer to the snapshot; this > should > > > tell RocksDB not to compact it out as the snapshot is not released. > > > > > > This approach would drastically reduce server-side complexity and hold > > > significantly less state. > > > I think this should be worth exploring, and if really impossible added > as a > > > rejected alternative. > > > > > > > > > *Could it be a single streaming RPC?* > > > > > > *Since the Fluss RPC layer is a custom Netty-based request/response > > > protocol, not gRPC, introducing a server-streaming primitive would > require > > > many changes and effort.* > > > > > > I understand, however, missing such a primitive still impacts Fluss, > and it > > > seems to me that we are still paying here the cost of implementing a > custom > > > streaming RPC for a special case. > > > It may be worth investing in the transport primitive once rather than > > > duplicating this pattern every time a streaming operation is needed. > > > One example is log tailing, from polling to true streaming RPC. Another > > > example is changelog streaming. > > > > > > What if we opt for streaming RPC FIP before this one so that this can > > > directly benefit from that? > > > > > > > > > On Fri, Mar 13, 2026 at 7:12 PM Giannis Polyzos <[email protected] > > > > > wrote: > > > > > > > Hi Lorenzo, > > > > > > > > And thank you for your comments. Lots of cool things to digest here, > and I > > > > would be eager to hear more people's thoughts. > > > > > > > > Regarding the API > > > > table.newScan() > > > > .withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG > > > > .execute() > > > > > > > > Currently, the scan method is based on the table creation, so I want > to > > > > differentiate between a scan (streaming consumption of a log) and > KvScan (a > > > > bounded rocksdb full table query). Then there is also some > functionality > > > > that we might not want to expose to the user, for example about the > > > > snapshot. This should be internal functionality. > > > > > > > > *Is the ScannerManager / session state actually required?* > > > > Pagination can definitely be a simple approach. However, the key > thing > > > > with this design is snapshot isolation across the entire bucket > scan. A > > > > ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, > and all > > > > subsequent batches read from that snapshot. This means: > > > > 1. Every key that existed at scan-open time is returned exactly once. > > > > 2. No key is duplicated because it was compacted and rewritten > between two > > > > requests. > > > > 3. No key is skipped because it was deleted and reinserted between > two > > > > requests. > > > > With cursor pagination, you lose that guarantee. > > > > > > > > > > > > *Could it be a single streaming RPC?* > > > > This would be a great approach, but my understanding is that we can't > > > > support this. Since the Fluss RPC layer is a custom Netty-based > > > > request/response protocol, not gRPC, introducing a server-streaming > > > > primitive would require many changes and effort. Let me know if I'm > > > > mistaken. > > > > > > > > Best, > > > > Giannis > > > > > > > > > > > > On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev < > > > > [email protected]> wrote: > > > > > > > >> Hello Giannis! > > > >> Very instructive read, I went through that and I was actually > astonished > > > >> that this was not the "default" mode that Fluss operates on PK > tables when > > > >> a snapshot is requested. > > > >> Very good that we have this initiative. > > > >> > > > >> *Is a new scanner type explicitly required?* > > > >> > > > >> I don't think it needs to be addressed now, but a good thing to > keep in > > > >> mind after this FIP gets implemented. > > > >> > > > >> > > > >> The cleaner design would arguably be to unify these under a single > > > >> abstraction where the scan source (live RocksDB vs. remote > snapshot) and > > > >> chunking behavior are implementation details, not separate scanner > > > >> classes. > > > >> The KvScan interface being introduced (table.newKvScan()) is a step > in > > > >> that > > > >> direction but it still sits alongside rather than replacing the > existing > > > >> scan path. > > > >> > > > >> A truly unified design might look like: > > > >> > > > >> table.newScan() > > > >> .withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG > > > >> .execute() > > > >> > > > >> I hope we keep this in mind for future, relevant situations. > > > >> > > > >> --------- > > > >> > > > >> *Is the ScannerManager / session state actually required?* > > > >> > > > >> The session exists because of the chunked, multi-RPC design -- the > server > > > >> needs to remember where the iterator is between calls. > > > >> But is that statefulness necessary at all? > > > >> > > > >> The stateless alternative: cursor-based pagination > > > >> > > > >> Instead of keeping a server-side iterator session, you could do > stateless > > > >> resumption using a last-seen key as a cursor: > > > >> > > > >> Request 1: scan from beginning -> returns rows + > last_key="key_0500" > > > >> Request 2: scan from "key_0500" -> returns rows + > last_key="key_1000" > > > >> Request 3: scan from "key_1000" -> ... > > > >> > > > >> Each request is fully independent. The server opens a fresh RocksDB > > > >> iterator, seeks to the cursor key, reads a batch, and closes it. No > > > >> session, no TTL, no ScannerManager. > > > >> > > > >> Advantages: > > > >> > > > >> - Massively simpler server side -- no session lifecycle, no TTL > reaper, > > > >> no leadership fencing complexity > > > >> - Naturally resilient to server restarts and leadership changes > -- > > > >> client just retries from the last cursor > > > >> - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes needed > > > >> > > > >> Tradeoffs: > > > >> > > > >> - *Consistency weakens slightly* -- each batch opens a fresh > RocksDB > > > >> snapshot, so you might see a key move between batches if it was > updated > > > >> between requests. With the session approach, the entire bucket > scan is > > > >> under one snapshot. > > > >> - Seek cost -- RocksDB iterator seeks are not free, especially > across > > > >> many SST files. For very large tables with many chunks this adds > up, > > > >> though > > > >> for the small key spaces FIP-17 targets it's likely negligible. > > > >> - Cursor encoding needs care for binary keys. > > > >> > > > >> Could it be a single streaming RPC? > > > >> > > > >> Rather than a request/response sequence with session state, you'd > have a > > > >> single server-streaming RPC: > > > >> > > > >> client -> ScanKvRequest (open) > > > >> server -> stream of ScanKvResponse chunks > > > >> server -> closes stream when exhausted > > > >> > > > >> If this is possible, the entire ScannerManager session complexity > > > >> evaporates -- the iterator just lives for the duration of the > stream, held > > > >> naturally by the connection. > > > >> > > > >> > > > >> > > > >> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee < > [email protected]> > > > >> wrote: > > > >> > > > >> > Hello Giannis, > > > >> > > > > >> > Thank you for the update to the proposal! Quickly skimmed through > and I > > > >> > like the updates that you’ve made! Questions / comments below: > > > >> > > > > >> > 1. You mentioned an extra section on heartbeat on the FIP, but I > do not > > > >> see > > > >> > heartbeat being mentioned on the latest version of the FIP? +1 > If the > > > >> > proposal is updated to rely solely on last scan for TTL and remove > > > >> > heartbeat, it’s a great change. If I remember correctly, the > previous > > > >> > version was to use heartbeat as keepalive, there is a risk of > unclosed, > > > >> > idle scanner holding resources on server side indefinitely and > causing > > > >> > leak. > > > >> > > > > >> > 2. On continuation request, should we check lastAccessTimeMs and > reject > > > >> if > > > >> > elapsed time is larger than TTL? Otherwise sessions can idle > between 60 > > > >> and > > > >> > 90 (TTL+ reaper interval). This might be exacerbated if user > configure > > > >> > particularly high TTL and reaper interval. > > > >> > > > > >> > 3. On SCANNER_EXPIRED, is it necessary to have a separate error > for > > > >> expired > > > >> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER (renaming > > > >> > UNKNOWN_SCANNER_ID). These are both terminal and non retriable, I > > > >> imagine > > > >> > that handling it from client side would not differ. It’s also a > small > > > >> > simplification to the implementation. > > > >> > > > > >> > 4. On pipelining. If the user queries for top-n every 10 seconds > to > > > >> update > > > >> > leaderboard, would pipelining cause higher unnecessary traffic? > E.g. > > > >> they > > > >> > only care about n records but pipelining automatically fetch up > to 8mb. > > > >> > > > > >> > 5. Also on pipelining, while it seems that we’re keeping Flink > connector > > > >> > out of scope, IIRC Flink split fetcher also pipelines. If we use > this to > > > >> > update Flink connector, we’d have higher amount buffered in > pipeline. > > > >> > > > > >> > 6. On expiration interval, should we hide that configuration and > choose > > > >> to > > > >> > expose it if there’s a strong need for it? It’s fewer config for > users > > > >> to > > > >> > reason about and 30s expiration sounds like a good starting point. > > > >> > > > > >> > Best regards > > > >> > > > > >> > Keith Lee > > > >> > > > > >> > > > > >> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos < > [email protected]> > > > >> > wrote: > > > >> > > > > >> > > Hi devs, > > > >> > > Let me know if there are any comments here, otherwise I would > like to > > > >> > start > > > >> > > a vote thread. > > > >> > > > > > >> > > Best, > > > >> > > Giannis > > > >> > > > > > >> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos < > [email protected] > > > >> > > > > >> > > wrote: > > > >> > > > > > >> > > > Hi devs, > > > >> > > > > > > >> > > > After a long time, i will like to reinitiate the discussions > on > > > >> FIP-17. > > > >> > > > > > > >> > > > I made quite a few updates on the FIP, which you can find > here: > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries > > > >> > > > and updated the title to better reflect the goal. Let me know > if it > > > >> > makes > > > >> > > > sense. > > > >> > > > > > > >> > > > Moreover in the end of the proposal, you will find a section > as > > > >> *extras > > > >> > > *which > > > >> > > > has a suggestion for a heartbeat mechanism. However, during > my PoC, > > > >> I > > > >> > > found > > > >> > > > that this is not really needed, but > > > >> > > > I would like your thoughts and feedback first. > > > >> > > > > > > >> > > > Best, > > > >> > > > Giannis > > > >> > > > > > > >> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos < > > > >> [email protected] > > > >> > > > > > >> > > > wrote: > > > >> > > > > > > >> > > >> Yang, thank you for your thoughtful comments. > > > >> > > >> > > > >> > > >> Indeed, we are streaming the results to the client; however, > it's > > > >> > still > > > >> > > a > > > >> > > >> batch operation. We could use "KV store (or PK table) > Snapshot > > > >> Query" > > > >> > > or > > > >> > > >> something similar, since we are querying a RocksDB snapshot. > WDYT? > > > >> > > >> The newly introduced KvBatchScanner should be able to be > reused > > > >> from > > > >> > > both > > > >> > > >> the client itself - assume a scenario that I want to > periodically > > > >> > query > > > >> > > the > > > >> > > >> full RocksDB KV store to power real-time dashboards - as > well as > > > >> Flink > > > >> > > >> (with more engines to follow later). > > > >> > > >> It issues requests to fetch the results per bucket and > transmit > > > >> them > > > >> > > back > > > >> > > >> to the client. > > > >> > > >> > > > >> > > >> > Could you elaborate on why the new KvBatchScanner isn't > reusable? > > > >> > > >> I think the reasoning here is that reach requests create a > new > > > >> > > >> KvBatchScanner, which polls the records and then closes > > > >> automatically. > > > >> > > Any > > > >> > > >> reason you see this as a limitation, and we should consider > making > > > >> it > > > >> > > >> reusable? > > > >> > > >> > > > >> > > >> The design aims mainly for the Fluss client API.. Should we > add an > > > >> > > >> integration design with Flink? Wang Cheng, WDYT? > > > >> > > >> > > > >> > > >> Best, > > > >> > > >> Giannis > > > >> > > >> > > > >> > > >> > > > >> > > >> > > > >> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang < > > > >> [email protected]> > > > >> > > >> wrote: > > > >> > > >> > > > >> > > >>> Hi Cheng, > > > >> > > >>> > > > >> > > >>> Thank you for driving this excellent work! Your FIP > document shows > > > >> > > great > > > >> > > >>> thought and initiative. I've gone through it and have some > > > >> questions > > > >> > > and > > > >> > > >>> suggestions that I hope can further enhance this valuable > > > >> > contribution. > > > >> > > >>> > > > >> > > >>> 1、Regarding the Title, I believe we could consider changing > it to > > > >> > > >>> "Support > > > >> > > >>> full scan in batch mode for PrimaryKey Table". The term > > > >> "Streaming" > > > >> > > might > > > >> > > >>> cause confusion with Flink's streaming/batch modes, and this > > > >> revised > > > >> > > >>> title > > > >> > > >>> would provide better clarity. > > > >> > > >>> > > > >> > > >>> 2、In the Motivation section, I think there are two > particularly > > > >> > > important > > > >> > > >>> benefits worth highlighting: (1) OLAP engines will be able > to > > > >> perform > > > >> > > >>> full > > > >> > > >>> snapshot reads on Fluss primary-key tables. (2) This > approach can > > > >> > > replace > > > >> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss > client to > > > >> > > >>> eliminate > > > >> > > >>> its RocksDB dependency entirely. > > > >> > > >>> > > > >> > > >>> 3、Concerning the Proposed Changes, could you clarify when > exactly > > > >> the > > > >> > > >>> client creates a KV snapshot on the server side, and when > we send > > > >> the > > > >> > > >>> bucket_scan_req? > > > >> > > >>> > > > >> > > >>> Let me share my thinking on this: When Flink attempts to > read > > > >> from a > > > >> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the JobMaster > > > >> > generates > > > >> > > >>> HybridSnapshotLogSplit and dispatches them to SplitReaders > > > >> running on > > > >> > > the > > > >> > > >>> TaskManager. The JobMaster doesn't actually read data—it > merely > > > >> > defines > > > >> > > >>> and > > > >> > > >>> manages the splits. Therefore, we need to ensure the JM has > > > >> > sufficient > > > >> > > >>> information to determine the boundary of the KV snapshot > and the > > > >> > > >>> startOffset of the LogSplit. > > > >> > > >>> > > > >> > > >>> I suggest we explicitly create a snapshot (or as you've > termed > > > >> it, a > > > >> > > >>> new_scan_request) on the server side. This way, the > > > >> > > FlinkSourceEnumerator > > > >> > > >>> can use it to define a HybridSnapshotLogSplit, and the > > > >> SplitReaders > > > >> > can > > > >> > > >>> perform pollBatch operations on this snapshot (which would > be > > > >> bound > > > >> > to > > > >> > > >>> the > > > >> > > >>> specified scanner_id). > > > >> > > >>> > > > >> > > >>> 4、 Could you elaborate on why the new KvBatchScanner isn't > > > >> reusable? > > > >> > > >>> What's > > > >> > > >>> the reasoning behind this limitation? (I believe RocksDB > > > >> iterators do > > > >> > > >>> support the seekToFirst operation.) If a TaskManager fails > over > > > >> > before > > > >> > > a > > > >> > > >>> checkpoint, rescanning an existing snapshot seems like a > natural > > > >> > > >>> requirement. > > > >> > > >>> > > > >> > > >>> 5、I think it would be beneficial to include some detailed > design > > > >> > > aspects > > > >> > > >>> regarding Flink's integration with the new BatchScanner. > > > >> > > >>> > > > >> > > >>> Overall, this is a solid foundation for an important > enhancement. > > > >> > > Looking > > > >> > > >>> forward to discussing these points further! > > > >> > > >>> > > > >> > > >>> Best regards, Yang > > > >> > > >>> > > > >> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三 > 17:09写道: > > > >> > > >>> > > > >> > > >>> > Hi all, > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > As of v0.8, Fluss only supports KV snapshot batch scan and > > > >> limit KV > > > >> > > >>> batch > > > >> > > >>> > scan. The former approach is constrained by snapshot > > > >> availability > > > >> > and > > > >> > > >>> > remote storage performance, while the later one is only > > > >> applicable > > > >> > to > > > >> > > >>> > queries with LIMIT clause and risks high memory pressure. > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > To address those limitations, Giannis Polyzos and I are > writing > > > >> to > > > >> > > >>> propose > > > >> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss [1]. > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > Any feedback and suggestions on this proposal are welcome! > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > [1]: > > > >> > > >>> > > > > >> > > >>> > > > >> > > > > > >> > > > > >> > https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC > > > >> > > >>> > > > > >> > > >>> > Regards, > > > >> > > >>> > Cheng > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > >> > > > >> -- > > > >> Lorenzo Affetti > > > >> Senior Software Engineer @ Flink Team > > > >> Ververica <http://www.ververica.com> > > > >> > > > > > > > > > > -- > > > Lorenzo Affetti > > > Senior Software Engineer @ Flink Team > > > Ververica <http://www.ververica.com> >
