Hi Giannis, Thanks for working on this, my concern is the continuation model.
The current design makes the server own both consistency and progress by keeping a live iterator session across RPCs. That is efficient, but it is also what makes retries awkward and assumes single-threaded iterator access, without enforcing it on the server side. More generally, once we keep server-side scan state across calls, we take on the usual session lifecycle machinery around expiry, fencing, and cleanup. There’s a middle ground worth evaluating explicitly: keep snapshot isolation if we need it, but return an opaque resume token, similar to Cassandra paging state, and have the client send it back unchanged on the next request. That keeps the consistency guarantee, but moves scan progress into the protocol instead of a live iterator session on the server. The tradeoff is an extra seek per batch. So in return, retries become much easier to reason about and the server has less mutable state to manage between calls. This feels close to what Lorenzo was asking to see evaluated. Also continuing Jark's reasoning about `call_seq_id` being simpler than key-based cursors - I think the comparison shifts when the token is opaque server-produced bytes rather than a client-constructed key, so the client never serializes anything, it just echoes the token back. Also I feel it would be good if the FIP compared: - live iterator session(HBase approach described in original FIP) - snapshot-only with continuation token(Cassandra style) - opaque stateless cursor(no snapshot isolation at all) and say why the extra machinery is justified for the use cases we’re targeting. KR, Anton чт, 26 мар. 2026 г. в 17:11, Giannis Polyzos <[email protected]>: > Hi Jark, > > Thank you for your input and for providing more context... I updated the > FIP based on your suggestions. > > I was debating with the API design, and my new design didn't take into > account the old design with the BatchScanner. I will follow the old design > since it's more consistent to what we have now. > > Best, > Giannis > > On Thu, Mar 26, 2026 at 5:50 PM Jark Wu <[email protected]> wrote: > > > Hi Lorenzo, > > > > I can help clarify some of the design decisions, as I was involved in > > the initial design of this FIP. > > > > (1) paginated requests with snapshot ID > > > > The current design of this FIP is exactly a paginated request with a > > snapshot ID. When a new scan request is initiated, the server takes a > > snapshot, and all subsequent streaming scan requests retrieve > > paginated data from that specific snapshot. > > > > The only distinction lies in the pagination mechanism. Your proposal > > uses keys, whereas the FIP utilizes `call_seq_id` (effectively a page > > ID). Adopting `call_seq_id` is a simpler and more efficient > > implementation. Using keys would be heavier, as they may require > > serializing multiple fields, significantly increasing the request > > payload size. > > > > > > (2) true streaming RPC > > IIUC, your design proposes a push-based model, whereas this FIP adopts > > a pull-based model, consistent with the architecture of Fluss and > > Kafka (e.g., in `FetchLog`). We chose the pull model for two key > > reasons: > > > > * Flow Control: In a push model, the server dictates the rate, > > risking consumer overload and causing backpressure that can exhaust > > server threads. The pull model allows clients to fetch data only when > > they are ready, naturally preventing congestion. > > * Replayability: Handling network failures is simpler with pulling. > > Clients can easily re-fetch missing data using a `page_id` or log > > offset. In contrast, managing state, offsets, and varying consumer > > speeds for thousands of clients in a push model adds significant > > complexity, effectively turning the server into a stateful dispatcher. > > > > Best, > > Jark > > > > > > > > On Fri, 27 Mar 2026 at 00:28, Jark Wu <[email protected]> wrote: > > > > > > Hi Giannis, > > > > > > Thanks for driving this. I only have some questions on the FIP. > > > > > > (1) Scan API > > > The FIP says "Remove the LIMIT requirement from > > > TableScan.createBatchScanner(TableBucket) for PK tables" which reuses > > > the existing "Table.newScan.createBatchScan(..)" API. But the Public > > > Interface section introduce a new API "Table.newKvScan.execute()". > > > > > > I believe extending the existing "Table.newScan().createBatchScan()" > > > to support non-LIMIT queries would provide a more unified approach. > > > Since this API already handles KV scan functionality, introducing a > > > separate KvScan interface could confuse users regarding which method > > > to choose. > > > > > > (2) The protobuf definition currently uses `uint32` and `uint64` for > > > several fields. Since Java lacks native support for unsigned integers, > > > this forces us to rely on `Integer.toUnsignedLong()` wrappers > > > throughout the codebase, adding unnecessary complexity. Switching to > > > `int32` and `int64` would significantly simplify the logic and improve > > > code readability. > > > > > > (3) Configuration. > > > I have a minor suggestion regarding the configuration naming. Since > > > this scanner is designed exclusively for KV tables, and our existing > > > server-side KV configurations consistently use the kv.* prefix, I > > > recommend changing server.scanner.* to kv.scanner.*. This adjustment > > > will ensure better consistency with our current naming conventions." > > > > > > Best, > > > Jark > > > > > > On Thu, 26 Mar 2026 at 16:44, Lorenzo Affetti via dev > > > <[email protected]> wrote: > > > > > > > > Ehy Giannis! Sorry for the late reply. > > > > > > > > *Is the ScannerManager / session state actually required?* > > > > > > > > *The key thing with this design is snapshot isolation across the > entire > > > > bucket scan.* > > > > *A ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, > > and > > > > all subsequent batches read from that snapshot.* > > > > > > > > I understand, however, with the same approach, I see easier ways to > > > > implement this. For example, make the client reply not only with a > > > > "last_key" but also with a "snapshot_id". > > > > The server would need to store the open snapshots (this is the > stateful > > > > part) to re-use them upon request. > > > > You can drop the iterator, but hold a pointer to the snapshot; this > > should > > > > tell RocksDB not to compact it out as the snapshot is not released. > > > > > > > > This approach would drastically reduce server-side complexity and > hold > > > > significantly less state. > > > > I think this should be worth exploring, and if really impossible > added > > as a > > > > rejected alternative. > > > > > > > > > > > > *Could it be a single streaming RPC?* > > > > > > > > *Since the Fluss RPC layer is a custom Netty-based request/response > > > > protocol, not gRPC, introducing a server-streaming primitive would > > require > > > > many changes and effort.* > > > > > > > > I understand, however, missing such a primitive still impacts Fluss, > > and it > > > > seems to me that we are still paying here the cost of implementing a > > custom > > > > streaming RPC for a special case. > > > > It may be worth investing in the transport primitive once rather than > > > > duplicating this pattern every time a streaming operation is needed. > > > > One example is log tailing, from polling to true streaming RPC. > Another > > > > example is changelog streaming. > > > > > > > > What if we opt for streaming RPC FIP before this one so that this can > > > > directly benefit from that? > > > > > > > > > > > > On Fri, Mar 13, 2026 at 7:12 PM Giannis Polyzos < > [email protected] > > > > > > > wrote: > > > > > > > > > Hi Lorenzo, > > > > > > > > > > And thank you for your comments. Lots of cool things to digest > here, > > and I > > > > > would be eager to hear more people's thoughts. > > > > > > > > > > Regarding the API > > > > > table.newScan() > > > > > .withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG > > > > > .execute() > > > > > > > > > > Currently, the scan method is based on the table creation, so I > want > > to > > > > > differentiate between a scan (streaming consumption of a log) and > > KvScan (a > > > > > bounded rocksdb full table query). Then there is also some > > functionality > > > > > that we might not want to expose to the user, for example about the > > > > > snapshot. This should be internal functionality. > > > > > > > > > > *Is the ScannerManager / session state actually required?* > > > > > Pagination can definitely be a simple approach. However, the key > > thing > > > > > with this design is snapshot isolation across the entire bucket > > scan. A > > > > > ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, > > and all > > > > > subsequent batches read from that snapshot. This means: > > > > > 1. Every key that existed at scan-open time is returned exactly > once. > > > > > 2. No key is duplicated because it was compacted and rewritten > > between two > > > > > requests. > > > > > 3. No key is skipped because it was deleted and reinserted between > > two > > > > > requests. > > > > > With cursor pagination, you lose that guarantee. > > > > > > > > > > > > > > > *Could it be a single streaming RPC?* > > > > > This would be a great approach, but my understanding is that we > can't > > > > > support this. Since the Fluss RPC layer is a custom Netty-based > > > > > request/response protocol, not gRPC, introducing a server-streaming > > > > > primitive would require many changes and effort. Let me know if I'm > > > > > mistaken. > > > > > > > > > > Best, > > > > > Giannis > > > > > > > > > > > > > > > On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev < > > > > > [email protected]> wrote: > > > > > > > > > >> Hello Giannis! > > > > >> Very instructive read, I went through that and I was actually > > astonished > > > > >> that this was not the "default" mode that Fluss operates on PK > > tables when > > > > >> a snapshot is requested. > > > > >> Very good that we have this initiative. > > > > >> > > > > >> *Is a new scanner type explicitly required?* > > > > >> > > > > >> I don't think it needs to be addressed now, but a good thing to > > keep in > > > > >> mind after this FIP gets implemented. > > > > >> > > > > >> > > > > >> The cleaner design would arguably be to unify these under a single > > > > >> abstraction where the scan source (live RocksDB vs. remote > > snapshot) and > > > > >> chunking behavior are implementation details, not separate scanner > > > > >> classes. > > > > >> The KvScan interface being introduced (table.newKvScan()) is a > step > > in > > > > >> that > > > > >> direction but it still sits alongside rather than replacing the > > existing > > > > >> scan path. > > > > >> > > > > >> A truly unified design might look like: > > > > >> > > > > >> table.newScan() > > > > >> .withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG > > > > >> .execute() > > > > >> > > > > >> I hope we keep this in mind for future, relevant situations. > > > > >> > > > > >> --------- > > > > >> > > > > >> *Is the ScannerManager / session state actually required?* > > > > >> > > > > >> The session exists because of the chunked, multi-RPC design -- the > > server > > > > >> needs to remember where the iterator is between calls. > > > > >> But is that statefulness necessary at all? > > > > >> > > > > >> The stateless alternative: cursor-based pagination > > > > >> > > > > >> Instead of keeping a server-side iterator session, you could do > > stateless > > > > >> resumption using a last-seen key as a cursor: > > > > >> > > > > >> Request 1: scan from beginning -> returns rows + > > last_key="key_0500" > > > > >> Request 2: scan from "key_0500" -> returns rows + > > last_key="key_1000" > > > > >> Request 3: scan from "key_1000" -> ... > > > > >> > > > > >> Each request is fully independent. The server opens a fresh > RocksDB > > > > >> iterator, seeks to the cursor key, reads a batch, and closes it. > No > > > > >> session, no TTL, no ScannerManager. > > > > >> > > > > >> Advantages: > > > > >> > > > > >> - Massively simpler server side -- no session lifecycle, no TTL > > reaper, > > > > >> no leadership fencing complexity > > > > >> - Naturally resilient to server restarts and leadership changes > > -- > > > > >> client just retries from the last cursor > > > > >> - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes needed > > > > >> > > > > >> Tradeoffs: > > > > >> > > > > >> - *Consistency weakens slightly* -- each batch opens a fresh > > RocksDB > > > > >> snapshot, so you might see a key move between batches if it was > > updated > > > > >> between requests. With the session approach, the entire bucket > > scan is > > > > >> under one snapshot. > > > > >> - Seek cost -- RocksDB iterator seeks are not free, especially > > across > > > > >> many SST files. For very large tables with many chunks this > adds > > up, > > > > >> though > > > > >> for the small key spaces FIP-17 targets it's likely negligible. > > > > >> - Cursor encoding needs care for binary keys. > > > > >> > > > > >> Could it be a single streaming RPC? > > > > >> > > > > >> Rather than a request/response sequence with session state, you'd > > have a > > > > >> single server-streaming RPC: > > > > >> > > > > >> client -> ScanKvRequest (open) > > > > >> server -> stream of ScanKvResponse chunks > > > > >> server -> closes stream when exhausted > > > > >> > > > > >> If this is possible, the entire ScannerManager session complexity > > > > >> evaporates -- the iterator just lives for the duration of the > > stream, held > > > > >> naturally by the connection. > > > > >> > > > > >> > > > > >> > > > > >> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee < > > [email protected]> > > > > >> wrote: > > > > >> > > > > >> > Hello Giannis, > > > > >> > > > > > >> > Thank you for the update to the proposal! Quickly skimmed > through > > and I > > > > >> > like the updates that you’ve made! Questions / comments below: > > > > >> > > > > > >> > 1. You mentioned an extra section on heartbeat on the FIP, but I > > do not > > > > >> see > > > > >> > heartbeat being mentioned on the latest version of the FIP? +1 > > If the > > > > >> > proposal is updated to rely solely on last scan for TTL and > remove > > > > >> > heartbeat, it’s a great change. If I remember correctly, the > > previous > > > > >> > version was to use heartbeat as keepalive, there is a risk of > > unclosed, > > > > >> > idle scanner holding resources on server side indefinitely and > > causing > > > > >> > leak. > > > > >> > > > > > >> > 2. On continuation request, should we check lastAccessTimeMs and > > reject > > > > >> if > > > > >> > elapsed time is larger than TTL? Otherwise sessions can idle > > between 60 > > > > >> and > > > > >> > 90 (TTL+ reaper interval). This might be exacerbated if user > > configure > > > > >> > particularly high TTL and reaper interval. > > > > >> > > > > > >> > 3. On SCANNER_EXPIRED, is it necessary to have a separate error > > for > > > > >> expired > > > > >> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER > (renaming > > > > >> > UNKNOWN_SCANNER_ID). These are both terminal and non retriable, > I > > > > >> imagine > > > > >> > that handling it from client side would not differ. It’s also a > > small > > > > >> > simplification to the implementation. > > > > >> > > > > > >> > 4. On pipelining. If the user queries for top-n every 10 seconds > > to > > > > >> update > > > > >> > leaderboard, would pipelining cause higher unnecessary traffic? > > E.g. > > > > >> they > > > > >> > only care about n records but pipelining automatically fetch up > > to 8mb. > > > > >> > > > > > >> > 5. Also on pipelining, while it seems that we’re keeping Flink > > connector > > > > >> > out of scope, IIRC Flink split fetcher also pipelines. If we use > > this to > > > > >> > update Flink connector, we’d have higher amount buffered in > > pipeline. > > > > >> > > > > > >> > 6. On expiration interval, should we hide that configuration and > > choose > > > > >> to > > > > >> > expose it if there’s a strong need for it? It’s fewer config for > > users > > > > >> to > > > > >> > reason about and 30s expiration sounds like a good starting > point. > > > > >> > > > > > >> > Best regards > > > > >> > > > > > >> > Keith Lee > > > > >> > > > > > >> > > > > > >> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos < > > [email protected]> > > > > >> > wrote: > > > > >> > > > > > >> > > Hi devs, > > > > >> > > Let me know if there are any comments here, otherwise I would > > like to > > > > >> > start > > > > >> > > a vote thread. > > > > >> > > > > > > >> > > Best, > > > > >> > > Giannis > > > > >> > > > > > > >> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos < > > [email protected] > > > > >> > > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Hi devs, > > > > >> > > > > > > > >> > > > After a long time, i will like to reinitiate the discussions > > on > > > > >> FIP-17. > > > > >> > > > > > > > >> > > > I made quite a few updates on the FIP, which you can find > > here: > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries > > > > >> > > > and updated the title to better reflect the goal. Let me > know > > if it > > > > >> > makes > > > > >> > > > sense. > > > > >> > > > > > > > >> > > > Moreover in the end of the proposal, you will find a section > > as > > > > >> *extras > > > > >> > > *which > > > > >> > > > has a suggestion for a heartbeat mechanism. However, during > > my PoC, > > > > >> I > > > > >> > > found > > > > >> > > > that this is not really needed, but > > > > >> > > > I would like your thoughts and feedback first. > > > > >> > > > > > > > >> > > > Best, > > > > >> > > > Giannis > > > > >> > > > > > > > >> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos < > > > > >> [email protected] > > > > >> > > > > > > >> > > > wrote: > > > > >> > > > > > > > >> > > >> Yang, thank you for your thoughtful comments. > > > > >> > > >> > > > > >> > > >> Indeed, we are streaming the results to the client; > however, > > it's > > > > >> > still > > > > >> > > a > > > > >> > > >> batch operation. We could use "KV store (or PK table) > > Snapshot > > > > >> Query" > > > > >> > > or > > > > >> > > >> something similar, since we are querying a RocksDB > snapshot. > > WDYT? > > > > >> > > >> The newly introduced KvBatchScanner should be able to be > > reused > > > > >> from > > > > >> > > both > > > > >> > > >> the client itself - assume a scenario that I want to > > periodically > > > > >> > query > > > > >> > > the > > > > >> > > >> full RocksDB KV store to power real-time dashboards - as > > well as > > > > >> Flink > > > > >> > > >> (with more engines to follow later). > > > > >> > > >> It issues requests to fetch the results per bucket and > > transmit > > > > >> them > > > > >> > > back > > > > >> > > >> to the client. > > > > >> > > >> > > > > >> > > >> > Could you elaborate on why the new KvBatchScanner isn't > > reusable? > > > > >> > > >> I think the reasoning here is that reach requests create a > > new > > > > >> > > >> KvBatchScanner, which polls the records and then closes > > > > >> automatically. > > > > >> > > Any > > > > >> > > >> reason you see this as a limitation, and we should consider > > making > > > > >> it > > > > >> > > >> reusable? > > > > >> > > >> > > > > >> > > >> The design aims mainly for the Fluss client API.. Should we > > add an > > > > >> > > >> integration design with Flink? Wang Cheng, WDYT? > > > > >> > > >> > > > > >> > > >> Best, > > > > >> > > >> Giannis > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang < > > > > >> [email protected]> > > > > >> > > >> wrote: > > > > >> > > >> > > > > >> > > >>> Hi Cheng, > > > > >> > > >>> > > > > >> > > >>> Thank you for driving this excellent work! Your FIP > > document shows > > > > >> > > great > > > > >> > > >>> thought and initiative. I've gone through it and have some > > > > >> questions > > > > >> > > and > > > > >> > > >>> suggestions that I hope can further enhance this valuable > > > > >> > contribution. > > > > >> > > >>> > > > > >> > > >>> 1、Regarding the Title, I believe we could consider > changing > > it to > > > > >> > > >>> "Support > > > > >> > > >>> full scan in batch mode for PrimaryKey Table". The term > > > > >> "Streaming" > > > > >> > > might > > > > >> > > >>> cause confusion with Flink's streaming/batch modes, and > this > > > > >> revised > > > > >> > > >>> title > > > > >> > > >>> would provide better clarity. > > > > >> > > >>> > > > > >> > > >>> 2、In the Motivation section, I think there are two > > particularly > > > > >> > > important > > > > >> > > >>> benefits worth highlighting: (1) OLAP engines will be able > > to > > > > >> perform > > > > >> > > >>> full > > > > >> > > >>> snapshot reads on Fluss primary-key tables. (2) This > > approach can > > > > >> > > replace > > > > >> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss > > client to > > > > >> > > >>> eliminate > > > > >> > > >>> its RocksDB dependency entirely. > > > > >> > > >>> > > > > >> > > >>> 3、Concerning the Proposed Changes, could you clarify when > > exactly > > > > >> the > > > > >> > > >>> client creates a KV snapshot on the server side, and when > > we send > > > > >> the > > > > >> > > >>> bucket_scan_req? > > > > >> > > >>> > > > > >> > > >>> Let me share my thinking on this: When Flink attempts to > > read > > > > >> from a > > > > >> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the > JobMaster > > > > >> > generates > > > > >> > > >>> HybridSnapshotLogSplit and dispatches them to SplitReaders > > > > >> running on > > > > >> > > the > > > > >> > > >>> TaskManager. The JobMaster doesn't actually read data—it > > merely > > > > >> > defines > > > > >> > > >>> and > > > > >> > > >>> manages the splits. Therefore, we need to ensure the JM > has > > > > >> > sufficient > > > > >> > > >>> information to determine the boundary of the KV snapshot > > and the > > > > >> > > >>> startOffset of the LogSplit. > > > > >> > > >>> > > > > >> > > >>> I suggest we explicitly create a snapshot (or as you've > > termed > > > > >> it, a > > > > >> > > >>> new_scan_request) on the server side. This way, the > > > > >> > > FlinkSourceEnumerator > > > > >> > > >>> can use it to define a HybridSnapshotLogSplit, and the > > > > >> SplitReaders > > > > >> > can > > > > >> > > >>> perform pollBatch operations on this snapshot (which would > > be > > > > >> bound > > > > >> > to > > > > >> > > >>> the > > > > >> > > >>> specified scanner_id). > > > > >> > > >>> > > > > >> > > >>> 4、 Could you elaborate on why the new KvBatchScanner isn't > > > > >> reusable? > > > > >> > > >>> What's > > > > >> > > >>> the reasoning behind this limitation? (I believe RocksDB > > > > >> iterators do > > > > >> > > >>> support the seekToFirst operation.) If a TaskManager fails > > over > > > > >> > before > > > > >> > > a > > > > >> > > >>> checkpoint, rescanning an existing snapshot seems like a > > natural > > > > >> > > >>> requirement. > > > > >> > > >>> > > > > >> > > >>> 5、I think it would be beneficial to include some detailed > > design > > > > >> > > aspects > > > > >> > > >>> regarding Flink's integration with the new BatchScanner. > > > > >> > > >>> > > > > >> > > >>> Overall, this is a solid foundation for an important > > enhancement. > > > > >> > > Looking > > > > >> > > >>> forward to discussing these points further! > > > > >> > > >>> > > > > >> > > >>> Best regards, Yang > > > > >> > > >>> > > > > >> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三 > > 17:09写道: > > > > >> > > >>> > > > > >> > > >>> > Hi all, > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > As of v0.8, Fluss only supports KV snapshot batch scan > and > > > > >> limit KV > > > > >> > > >>> batch > > > > >> > > >>> > scan. The former approach is constrained by snapshot > > > > >> availability > > > > >> > and > > > > >> > > >>> > remote storage performance, while the later one is only > > > > >> applicable > > > > >> > to > > > > >> > > >>> > queries with LIMIT clause and risks high memory > pressure. > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > To address those limitations, Giannis Polyzos and I are > > writing > > > > >> to > > > > >> > > >>> propose > > > > >> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss > [1]. > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > Any feedback and suggestions on this proposal are > welcome! > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > [1]: > > > > >> > > >>> > > > > > >> > > >>> > > > > >> > > > > > > >> > > > > > >> > > > https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC > > > > >> > > >>> > > > > > >> > > >>> > Regards, > > > > >> > > >>> > Cheng > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > > >> > > >>> > > > > >> > > >> > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> -- > > > > >> Lorenzo Affetti > > > > >> Senior Software Engineer @ Flink Team > > > > >> Ververica <http://www.ververica.com> > > > > >> > > > > > > > > > > > > > -- > > > > Lorenzo Affetti > > > > Senior Software Engineer @ Flink Team > > > > Ververica <http://www.ververica.com> > > >
