Hi Jark,

Thank you for your input and for providing more context... I updated the
FIP based on your suggestions.

I was debating with the API design, and my new design didn't take into
account the old design with the BatchScanner. I will follow the old design
since it's more consistent to what we have now.

Best,
Giannis

On Thu, Mar 26, 2026 at 5:50 PM Jark Wu <[email protected]> wrote:

> Hi Lorenzo,
>
> I can help clarify some of the design decisions, as I was involved in
> the initial design of this FIP.
>
> (1) paginated requests with snapshot ID
>
> The current design of this FIP is exactly a paginated request with a
> snapshot ID. When a new scan request is initiated, the server takes a
> snapshot, and all subsequent streaming scan requests retrieve
> paginated data from that specific snapshot.
>
> The only distinction lies in the pagination mechanism. Your proposal
> uses keys, whereas the FIP utilizes `call_seq_id` (effectively a page
> ID). Adopting `call_seq_id` is a simpler and more efficient
> implementation. Using keys would be heavier, as they may require
> serializing multiple fields, significantly increasing the request
> payload size.
>
>
> (2) true streaming RPC
> IIUC, your design proposes a push-based model, whereas this FIP adopts
> a pull-based model, consistent with the architecture of Fluss and
> Kafka (e.g., in `FetchLog`). We chose the pull model for two key
> reasons:
>
> *   Flow Control: In a push model, the server dictates the rate,
> risking consumer overload and causing backpressure that can exhaust
> server threads. The pull model allows clients to fetch data only when
> they are ready, naturally preventing congestion.
> *   Replayability: Handling network failures is simpler with pulling.
> Clients can easily re-fetch missing data using a `page_id` or log
> offset. In contrast, managing state, offsets, and varying consumer
> speeds for thousands of clients in a push model adds significant
> complexity, effectively turning the server into a stateful dispatcher.
>
> Best,
> Jark
>
>
>
> On Fri, 27 Mar 2026 at 00:28, Jark Wu <[email protected]> wrote:
> >
> > Hi Giannis,
> >
> > Thanks for driving this. I only have some questions on the FIP.
> >
> > (1) Scan API
> > The FIP says "Remove the LIMIT requirement from
> > TableScan.createBatchScanner(TableBucket) for PK tables" which reuses
> > the existing "Table.newScan.createBatchScan(..)" API. But the Public
> > Interface section introduce a new API "Table.newKvScan.execute()".
> >
> > I believe extending the existing "Table.newScan().createBatchScan()"
> > to support non-LIMIT queries would provide a more unified approach.
> > Since this API already handles KV scan functionality, introducing a
> > separate KvScan interface could confuse users regarding which method
> > to choose.
> >
> > (2) The protobuf definition currently uses `uint32` and `uint64` for
> > several fields. Since Java lacks native support for unsigned integers,
> > this forces us to rely on `Integer.toUnsignedLong()` wrappers
> > throughout the codebase, adding unnecessary complexity. Switching to
> > `int32` and `int64` would significantly simplify the logic and improve
> > code readability.
> >
> > (3) Configuration.
> > I have a minor suggestion regarding the configuration naming. Since
> > this scanner is designed exclusively for KV tables, and our existing
> > server-side KV configurations consistently use the kv.* prefix, I
> > recommend changing server.scanner.* to kv.scanner.*. This adjustment
> > will ensure better consistency with our current naming conventions."
> >
> > Best,
> > Jark
> >
> > On Thu, 26 Mar 2026 at 16:44, Lorenzo Affetti via dev
> > <[email protected]> wrote:
> > >
> > > Ehy Giannis! Sorry for the late reply.
> > >
> > > *Is the ScannerManager / session state actually required?*
> > >
> > > *The key thing with this design is snapshot isolation across the entire
> > > bucket scan.*
> > > *A ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease,
> and
> > > all subsequent batches read from that snapshot.*
> > >
> > > I understand, however, with the same approach, I see easier ways to
> > > implement this. For example, make the client reply not only with a
> > > "last_key" but also with a "snapshot_id".
> > > The server would need to store the open snapshots (this is the stateful
> > > part) to re-use them upon request.
> > > You can drop the iterator, but hold a pointer to the snapshot; this
> should
> > > tell RocksDB not to compact it out as the snapshot is not released.
> > >
> > > This approach would drastically reduce server-side complexity and hold
> > > significantly less state.
> > > I think this should be worth exploring, and if really impossible added
> as a
> > > rejected alternative.
> > >
> > >
> > > *Could it be a single streaming RPC?*
> > >
> > > *Since the Fluss RPC layer is a custom Netty-based request/response
> > > protocol, not gRPC, introducing a server-streaming primitive would
> require
> > > many changes and effort.*
> > >
> > > I understand, however, missing such a primitive still impacts Fluss,
> and it
> > > seems to me that we are still paying here the cost of implementing a
> custom
> > > streaming RPC for a special case.
> > > It may be worth investing in the transport primitive once rather than
> > > duplicating this pattern every time a streaming operation is needed.
> > > One example is log tailing, from polling to true streaming RPC. Another
> > > example is changelog streaming.
> > >
> > > What if we opt for streaming RPC FIP before this one so that this can
> > > directly benefit from that?
> > >
> > >
> > > On Fri, Mar 13, 2026 at 7:12 PM Giannis Polyzos <[email protected]
> >
> > > wrote:
> > >
> > > > Hi Lorenzo,
> > > >
> > > > And thank you for your comments. Lots of cool things to digest here,
> and I
> > > > would be eager to hear more people's thoughts.
> > > >
> > > > Regarding the API
> > > > table.newScan()
> > > >      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
> > > >      .execute()
> > > >
> > > > Currently, the scan method is based on the table creation, so I want
> to
> > > > differentiate between a scan (streaming consumption of a log) and
> KvScan (a
> > > > bounded rocksdb full table query). Then there is also some
> functionality
> > > > that we might not want to expose to the user, for example about the
> > > > snapshot. This should be internal functionality.
> > > >
> > > > *Is the ScannerManager / session state actually required?*
> > > > Pagination can definitely be a simple approach. However, the key
> thing
> > > > with this design is snapshot isolation across the entire bucket
> scan. A
> > > > ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease,
> and all
> > > > subsequent batches read from that snapshot. This means:
> > > > 1. Every key that existed at scan-open time is returned exactly once.
> > > > 2. No key is duplicated because it was compacted and rewritten
> between two
> > > > requests.
> > > > 3. No key is skipped because it was deleted and reinserted between
> two
> > > > requests.
> > > > With cursor pagination, you lose that guarantee.
> > > >
> > > >
> > > > *Could it be a single streaming RPC?*
> > > > This would be a great approach, but my understanding is that we can't
> > > > support this. Since the Fluss RPC layer is a custom Netty-based
> > > > request/response protocol, not gRPC, introducing a server-streaming
> > > > primitive would require many changes and effort. Let me know if I'm
> > > > mistaken.
> > > >
> > > > Best,
> > > > Giannis
> > > >
> > > >
> > > > On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev <
> > > > [email protected]> wrote:
> > > >
> > > >> Hello Giannis!
> > > >> Very instructive read, I went through that and I was actually
> astonished
> > > >> that this was not the "default" mode that Fluss operates on PK
> tables when
> > > >> a snapshot is requested.
> > > >> Very good that we have this initiative.
> > > >>
> > > >> *Is a new scanner type explicitly required?*
> > > >>
> > > >> I don't think it needs to be addressed now, but a good thing to
> keep in
> > > >> mind after this FIP gets implemented.
> > > >>
> > > >>
> > > >> The cleaner design would arguably be to unify these under a single
> > > >> abstraction where the scan source (live RocksDB vs. remote
> snapshot) and
> > > >> chunking behavior are implementation details, not separate scanner
> > > >> classes.
> > > >> The KvScan interface being introduced (table.newKvScan()) is a step
> in
> > > >> that
> > > >> direction but it still sits alongside rather than replacing the
> existing
> > > >> scan path.
> > > >>
> > > >> A truly unified design might look like:
> > > >>
> > > >> table.newScan()
> > > >>      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
> > > >>      .execute()
> > > >>
> > > >> I hope we keep this in mind for future, relevant situations.
> > > >>
> > > >> ---------
> > > >>
> > > >> *Is the ScannerManager / session state actually required?*
> > > >>
> > > >> The session exists because of the chunked, multi-RPC design -- the
> server
> > > >> needs to remember where the iterator is between calls.
> > > >> But is that statefulness necessary at all?
> > > >>
> > > >> The stateless alternative: cursor-based pagination
> > > >>
> > > >> Instead of keeping a server-side iterator session, you could do
> stateless
> > > >> resumption using a last-seen key as a cursor:
> > > >>
> > > >> Request 1: scan from beginning   -> returns rows +
> last_key="key_0500"
> > > >> Request 2: scan from "key_0500"  -> returns rows +
> last_key="key_1000"
> > > >> Request 3: scan from "key_1000"  -> ...
> > > >>
> > > >> Each request is fully independent. The server opens a fresh RocksDB
> > > >> iterator, seeks to the cursor key, reads a batch, and closes it. No
> > > >> session, no TTL, no ScannerManager.
> > > >>
> > > >> Advantages:
> > > >>
> > > >>    - Massively simpler server side -- no session lifecycle, no TTL
> reaper,
> > > >>    no leadership fencing complexity
> > > >>    - Naturally resilient to server restarts and leadership changes
> --
> > > >>    client just retries from the last cursor
> > > >>    - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes needed
> > > >>
> > > >> Tradeoffs:
> > > >>
> > > >>    - *Consistency weakens slightly* -- each batch opens a fresh
> RocksDB
> > > >>    snapshot, so you might see a key move between batches if it was
> updated
> > > >>    between requests. With the session approach, the entire bucket
> scan is
> > > >>    under one snapshot.
> > > >>    - Seek cost -- RocksDB iterator seeks are not free, especially
> across
> > > >>    many SST files. For very large tables with many chunks this adds
> up,
> > > >> though
> > > >>    for the small key spaces FIP-17 targets it's likely negligible.
> > > >>    - Cursor encoding needs care for binary keys.
> > > >>
> > > >> Could it be a single streaming RPC?
> > > >>
> > > >> Rather than a request/response sequence with session state, you'd
> have a
> > > >> single server-streaming RPC:
> > > >>
> > > >> client -> ScanKvRequest (open)
> > > >> server -> stream of ScanKvResponse chunks
> > > >> server -> closes stream when exhausted
> > > >>
> > > >> If this is possible, the entire ScannerManager session complexity
> > > >> evaporates -- the iterator just lives for the duration of the
> stream, held
> > > >> naturally by the connection.
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee <
> [email protected]>
> > > >> wrote:
> > > >>
> > > >> > Hello Giannis,
> > > >> >
> > > >> > Thank you for the update to the proposal! Quickly skimmed through
> and I
> > > >> > like the updates that you’ve made! Questions / comments below:
> > > >> >
> > > >> > 1. You mentioned an extra section on heartbeat on the FIP, but I
> do not
> > > >> see
> > > >> > heartbeat being mentioned on the latest version of the FIP?  +1
> If the
> > > >> > proposal is updated to rely solely on last scan for TTL and remove
> > > >> > heartbeat, it’s a great change. If I remember correctly, the
> previous
> > > >> > version was to use heartbeat as keepalive, there is a risk of
> unclosed,
> > > >> > idle scanner holding resources on server side indefinitely and
> causing
> > > >> > leak.
> > > >> >
> > > >> > 2. On continuation request, should we check lastAccessTimeMs and
> reject
> > > >> if
> > > >> > elapsed time is larger than TTL? Otherwise sessions can idle
> between 60
> > > >> and
> > > >> > 90 (TTL+ reaper interval). This might be exacerbated if user
> configure
> > > >> > particularly high TTL and reaper interval.
> > > >> >
> > > >> > 3. On SCANNER_EXPIRED, is it necessary to have a separate error
> for
> > > >> expired
> > > >> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER (renaming
> > > >> > UNKNOWN_SCANNER_ID). These are both terminal and non retriable, I
> > > >> imagine
> > > >> > that handling it from client side would not differ. It’s also a
> small
> > > >> > simplification to the implementation.
> > > >> >
> > > >> > 4. On pipelining. If the user queries for top-n every 10 seconds
> to
> > > >> update
> > > >> > leaderboard, would pipelining cause higher unnecessary traffic?
> E.g.
> > > >> they
> > > >> > only care about n records but pipelining automatically fetch up
> to 8mb.
> > > >> >
> > > >> > 5. Also on pipelining, while it seems that we’re keeping Flink
> connector
> > > >> > out of scope, IIRC Flink split fetcher also pipelines. If we use
> this to
> > > >> > update Flink connector, we’d have higher amount buffered in
> pipeline.
> > > >> >
> > > >> > 6. On expiration interval, should we hide that configuration and
> choose
> > > >> to
> > > >> > expose it if there’s a strong need for it? It’s fewer config for
> users
> > > >> to
> > > >> > reason about and 30s expiration sounds like a good starting point.
> > > >> >
> > > >> > Best regards
> > > >> >
> > > >> > Keith Lee
> > > >> >
> > > >> >
> > > >> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos <
> [email protected]>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi devs,
> > > >> > > Let me know if there are any comments here, otherwise I would
> like to
> > > >> > start
> > > >> > > a vote thread.
> > > >> > >
> > > >> > > Best,
> > > >> > > Giannis
> > > >> > >
> > > >> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos <
> [email protected]
> > > >> >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi devs,
> > > >> > > >
> > > >> > > > After a long time, i will like to reinitiate the discussions
> on
> > > >> FIP-17.
> > > >> > > >
> > > >> > > > I made quite a few updates on the FIP, which you can find
> here:
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries
> > > >> > > > and updated the title to better reflect the goal. Let me know
> if it
> > > >> > makes
> > > >> > > > sense.
> > > >> > > >
> > > >> > > > Moreover in the end of the proposal, you will find a section
> as
> > > >> *extras
> > > >> > > *which
> > > >> > > > has a suggestion for a heartbeat mechanism. However, during
> my PoC,
> > > >> I
> > > >> > > found
> > > >> > > > that this is not really needed, but
> > > >> > > > I would like your thoughts and feedback first.
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Giannis
> > > >> > > >
> > > >> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos <
> > > >> [email protected]
> > > >> > >
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Yang, thank you for your thoughtful comments.
> > > >> > > >>
> > > >> > > >> Indeed, we are streaming the results to the client; however,
> it's
> > > >> > still
> > > >> > > a
> > > >> > > >> batch operation. We could use "KV store (or PK table)
> Snapshot
> > > >> Query"
> > > >> > > or
> > > >> > > >> something similar, since we are querying a RocksDB snapshot.
> WDYT?
> > > >> > > >> The newly introduced KvBatchScanner should be able to be
> reused
> > > >> from
> > > >> > > both
> > > >> > > >> the client itself - assume a scenario that I want to
> periodically
> > > >> > query
> > > >> > > the
> > > >> > > >> full RocksDB KV store to power real-time dashboards - as
> well as
> > > >> Flink
> > > >> > > >> (with more engines to follow later).
> > > >> > > >> It issues requests to fetch the results per bucket and
> transmit
> > > >> them
> > > >> > > back
> > > >> > > >> to the client.
> > > >> > > >>
> > > >> > > >> > Could you elaborate on why the new KvBatchScanner isn't
> reusable?
> > > >> > > >> I think the reasoning here is that reach requests create a
> new
> > > >> > > >> KvBatchScanner, which polls the records and then closes
> > > >> automatically.
> > > >> > > Any
> > > >> > > >> reason you see this as a limitation, and we should consider
> making
> > > >> it
> > > >> > > >> reusable?
> > > >> > > >>
> > > >> > > >> The design aims mainly for the Fluss client API.. Should we
> add an
> > > >> > > >> integration design with Flink? Wang Cheng, WDYT?
> > > >> > > >>
> > > >> > > >> Best,
> > > >> > > >> Giannis
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang <
> > > >> [email protected]>
> > > >> > > >> wrote:
> > > >> > > >>
> > > >> > > >>> Hi Cheng,
> > > >> > > >>>
> > > >> > > >>> Thank you for driving this excellent work! Your FIP
> document shows
> > > >> > > great
> > > >> > > >>> thought and initiative. I've gone through it and have some
> > > >> questions
> > > >> > > and
> > > >> > > >>> suggestions that I hope can further enhance this valuable
> > > >> > contribution.
> > > >> > > >>>
> > > >> > > >>> 1、Regarding the Title, I believe we could consider changing
> it to
> > > >> > > >>> "Support
> > > >> > > >>> full scan in batch mode for PrimaryKey Table". The term
> > > >> "Streaming"
> > > >> > > might
> > > >> > > >>> cause confusion with Flink's streaming/batch modes, and this
> > > >> revised
> > > >> > > >>> title
> > > >> > > >>> would provide better clarity.
> > > >> > > >>>
> > > >> > > >>> 2、In the Motivation section, I think there are two
> particularly
> > > >> > > important
> > > >> > > >>> benefits worth highlighting: (1) OLAP engines will be able
> to
> > > >> perform
> > > >> > > >>> full
> > > >> > > >>> snapshot reads on Fluss primary-key tables. (2) This
> approach can
> > > >> > > replace
> > > >> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss
> client to
> > > >> > > >>> eliminate
> > > >> > > >>> its RocksDB dependency entirely.
> > > >> > > >>>
> > > >> > > >>> 3、Concerning the Proposed Changes, could you clarify when
> exactly
> > > >> the
> > > >> > > >>> client creates a KV snapshot on the server side, and when
> we send
> > > >> the
> > > >> > > >>> bucket_scan_req?
> > > >> > > >>>
> > > >> > > >>> Let me share my thinking on this: When Flink attempts to
> read
> > > >> from a
> > > >> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the JobMaster
> > > >> > generates
> > > >> > > >>> HybridSnapshotLogSplit and dispatches them to SplitReaders
> > > >> running on
> > > >> > > the
> > > >> > > >>> TaskManager. The JobMaster doesn't actually read data—it
> merely
> > > >> > defines
> > > >> > > >>> and
> > > >> > > >>> manages the splits. Therefore, we need to ensure the JM has
> > > >> > sufficient
> > > >> > > >>> information to determine the boundary of the KV snapshot
> and the
> > > >> > > >>> startOffset of the LogSplit.
> > > >> > > >>>
> > > >> > > >>> I suggest we explicitly create a snapshot (or as you've
> termed
> > > >> it, a
> > > >> > > >>> new_scan_request) on the server side. This way, the
> > > >> > > FlinkSourceEnumerator
> > > >> > > >>> can use it to define a HybridSnapshotLogSplit, and the
> > > >> SplitReaders
> > > >> > can
> > > >> > > >>> perform pollBatch operations on this snapshot (which would
> be
> > > >> bound
> > > >> > to
> > > >> > > >>> the
> > > >> > > >>> specified scanner_id).
> > > >> > > >>>
> > > >> > > >>> 4、 Could you elaborate on why the new KvBatchScanner isn't
> > > >> reusable?
> > > >> > > >>> What's
> > > >> > > >>> the reasoning behind this limitation? (I believe RocksDB
> > > >> iterators do
> > > >> > > >>> support the seekToFirst operation.) If a TaskManager fails
> over
> > > >> > before
> > > >> > > a
> > > >> > > >>> checkpoint, rescanning an existing snapshot seems like a
> natural
> > > >> > > >>> requirement.
> > > >> > > >>>
> > > >> > > >>> 5、I think it would be beneficial to include some detailed
> design
> > > >> > > aspects
> > > >> > > >>> regarding Flink's integration with the new BatchScanner.
> > > >> > > >>>
> > > >> > > >>> Overall, this is a solid foundation for an important
> enhancement.
> > > >> > > Looking
> > > >> > > >>> forward to discussing these points further!
> > > >> > > >>>
> > > >> > > >>> Best regards, Yang
> > > >> > > >>>
> > > >> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三
> 17:09写道:
> > > >> > > >>>
> > > >> > > >>> > Hi all,
> > > >> > > >>> >
> > > >> > > >>> >
> > > >> > > >>> > As of v0.8, Fluss only supports KV snapshot batch scan and
> > > >> limit KV
> > > >> > > >>> batch
> > > >> > > >>> > scan. The former approach is constrained by snapshot
> > > >> availability
> > > >> > and
> > > >> > > >>> > remote storage performance, while the later one is only
> > > >> applicable
> > > >> > to
> > > >> > > >>> > queries with LIMIT clause and risks high memory pressure.
> > > >> > > >>> >
> > > >> > > >>> >
> > > >> > > >>> > To address those limitations, Giannis Polyzos and I are
> writing
> > > >> to
> > > >> > > >>> propose
> > > >> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss [1].
> > > >> > > >>> >
> > > >> > > >>> >
> > > >> > > >>> > Any feedback and suggestions on this proposal are welcome!
> > > >> > > >>> >
> > > >> > > >>> >
> > > >> > > >>> > [1]:
> > > >> > > >>> >
> > > >> > > >>>
> > > >> > >
> > > >> >
> > > >>
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC
> > > >> > > >>> >
> > > >> > > >>> > Regards,
> > > >> > > >>> > Cheng
> > > >> > > >>> >
> > > >> > > >>> >
> > > >> > > >>> >
> > > >> > > >>> > &nbsp;
> > > >> > > >>>
> > > >> > > >>
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> Lorenzo Affetti
> > > >> Senior Software Engineer @ Flink Team
> > > >> Ververica <http://www.ververica.com>
> > > >>
> > > >
> > >
> > > --
> > > Lorenzo Affetti
> > > Senior Software Engineer @ Flink Team
> > > Ververica <http://www.ververica.com>
>

Reply via email to