leventov opened a new issue, #43762: URL: https://github.com/apache/arrow/issues/43762
### Describe the enhancement requested I have a nagging feeling that the family of Arrow protocols and formats misses a generic protocol for streaming columnar data across ArrowArrays, such as to pack data from multiple arrays into a contiguous chunk of GPU memory. The goal is similar to [Dissociated IPC](https://arrow.apache.org/docs/format/DissociatedIPC.html), but I think a more flexible design is possible. The times and places where the concerns of Dissociated IPC are relevant is also when asynchronous programming model is often used (such as, because GPU operations are asynchronous with CPU), and therefore I think it makes sense to consider asynchronous streaming with cancels and back pressure, as well as off-loading a-la Dissociated IPC in a single protocol design space. Dissociated IPC’s `<free_data>` signal semantically always coincides with other async streaming signals, as will be detailed below, thus a separate type of message is not needed. Please don't take the description below too literally, it's more of a directional sketch and the invitation for discussion than a concrete proposal. Also, this is inspired by @westonpace's "[columnar container format](https://blog.lancedb.com/lance-v2/)", and can be seen as an attempt to do the same, but "for the wire/transfer" rather than "for the disk/storage". ## Protocol description I think it makes the most sense to mostly inherit reactive streaming semantics from [RSocket](https://rsocket.io/), specifically the semantics of [ERROR](https://rsocket.io/about/protocol#error-frame-0x0b), [CANCEL](https://rsocket.io/about/protocol#cancel-frame-0x09), [LEASE](https://rsocket.io/about/protocol#lease-frame-0x02), [REQUEST_N](https://rsocket.io/about/protocol#request_n-frame-0x08), and COMPLETE signals, indicating and how they interact with payload streaming: see [Request Stream sequences](https://rsocket.io/about/protocol/#request-stream). Array streaming warrants one modification to RSocket semantics, however. RSocket’s flow control is defined in terms of *payload count* that Requester (I call it Receiver below) requests from Responder (I call it Sender) using [REQUEST_N](https://rsocket.io/about/protocol/#request_n-frame-0x08) signal. However, the most reasonable unit for flow control in ArrowArray streaming is the number of records/rows, and there will typically be many rows in a single payload. ### Data plane characteristics negotiation Before array streaming begins, Sender and Receiver should negotiate the data plane concerns to minimise copies and optimise for the array production that precedes sending and the array consumption that follows reception, respectively. **Transfer modes** **1. Standard (in-bound)**: the data transfer happens over the same transport as negotiation and control messaging. This is the default for Arrow Flight. This default transport will typically be gRPC, QUIC, or WebTransport. **2. Off-loaded Receiver read**: Sender sends to Receiver the addresses of buffers in its CPU or GPU memory via a PAYLOAD message. Receiver then reads the buffers via out-of-bound transfer. Sender assumes that the buffers can be freed (or RDMA unpin, etc.) after Receiver sends any subsequent signal to Sender: either ERROR, CANCEL, REQUEST_N (which implies that the Receiver completed processing of this payload and ready to receive the next), or COMPLETE. Note: This mode is optimal for operation pipelining on Receiver’s side. Receiver’s GPU can pipeline processing operations with RDMA reads in CUDA. This is the mode is supported in Dissociated IPC. However, this mode “breaks the pipeline” on Sender’s side. When Receiver doesn’t have RDMA access to Sender’s memory, but they are still connected by some transport faster than UDP (or multiple transports in parallel), this mode is still favourable to sending the buffers in-bound, albeit there won’t be any special win in Receiver’s GPU pipelining. **3. Off-loaded Sender write**: Receiver sends to Sender the addresses of buffers in its CPU or GPU memory via REQUEST_N message (alongside the requested number of rows). The buffers MUST be large enough to fit this number of rows. Then Sender writes to these buffers via an out-of-bound transfer. Sender can write fewer rows than than requested (it’s not guaranteed to have that many rows). Sender sends to Receiver the number of rows written, along with other metadata (such as the number of bytes written into each buffer) via a subsequent PAYLOAD message. Upon receiving such a PAYLOAD message (as well as ERROR or COMPLETE signal), Receiver can take over the ownership of the buffers back again and free the resources that had to be allocated for writing, RDMA unpin, etc. Note: This mode is optimal for pipelining on Sender’s side. Sender’s GPU can pipeline RDMA writes or `send()` right after the processing operations that produce the buffers. However, RDMA writes are rarely available, so in practice it can mostly be used when Sender and Receiver are on the same PCIe bus, i.e., in the same node. **4. "Simple" off-loading**: Off-loading data plane to a different connection (a different port, and, perhaps, over a different transport, such as raw QUIC vs. gRPC) than the standard (in-bound) connection, on which the control plane remains. There could perhaps be some performance or buffer alignment (see below) reasons to do this. Note: This is what Dissociated IPC also supports, by introducing the possibility of separate Data and Metadata streams, but Dissociated IPC makes this mode a different "degree of freedom" than the "off-loaded Receiver read" mode. I don't understand the need for this, and it looks unnecessarily over-complicated for me. **UCX or OFI/libfabric negotiation** Ideally, UCX or OFI/libfabric negotiation (rndv protocol) and connection establishment should happen only once per stream, together with the negotiation of Sender and Receiver which transfer mode they should use, rather than before each Array sent in the stream. **Batch sizes and alignment** Both Sender and Receiver may have minimum and/or optimal batch sizes. They should try agree to both use the batch size optimal for the slower side, and then Receiver requests this agreed-upon number of rows in each of its REQUEST_N messages. Receiver can specify that it has a minimum batch size for the off-loaded modes. If Sender has highly irregular Array sizes, it should buffer them internally until it has prepared (e.g., read from disk) a sufficient number of rows. Only the trailing PAYLOAD segment may have insufficient rows, and it may fall-back to the standard (in-bound) mode. Finally, Receiver may have requirements regarding buffer alignment. If the standard (in-bound) transport doesn’t guarantee frame’s data bytes alignment without an extra memory copy (i.e., the 64-byte alignment of the first buffer in the PAYLOAD message), Sender and Receiver can try to avoid this copy by establishing a separate connection (such as, on a different UDP port) with the transport protocol library configured such that it supports such alignment. ### Flow control Both Sender and Receiver must keep track of the total *cumulative* number of rows that Receiver requested via REQUEST_N messages since the opening of the stream and the total cumulative number of rows that Receiver has claimed to deliver via PAYLOAD messages. Sender must never send PAYLOADS that make the latter number (delivered rows) exceed the former. In the “Off-loaded Receiver read” mode, if Receiver needs to indicate to Sender that it no longer needs to buffers, but the Sender is still fine with the remaining number of requested rows and doesn’t want to increase it, Receiver should send REQUEST_N message with n=0. In addition, in the “Off-loaded Sender write” mode, both Sender and Receiver must keep track of the sequences (one sequence per Array’s buffer, to be precise) of Receiver’s memory regions that it offered via REQUEST_N messages, and the “high watermark” of written bytes in these buffers that “flows through” these sequences. But Sender may also copy this information (watermarks) in PAYLOAD messages, to double-check with Receiver, mainly as a sanity measure. ### “Multiplexing”: Array splitting into multiple streams I think it’s most reasonable that Sender and Receiver negotiate just one transfer mode per array stream. If multiple modes are needed, Sender can split the Array into groups of buffers that need to be transferred in different modes, and then the original semantics “reconstructed” in the Receiver’s application. It’s also most likely that non-standard transfer modes (modes 2 and 3) can only work sanely for fixed-size layout buffers only. For variable-size layouts, it’s too complex to pack buffers from multiple subsequent Arrays into the same contiguous memory regions. I also don’t know what would be the use-cases for this, anyway. Thus, during the initial negotiation phase, Sender and Receiver can decide how they split Arrays into multiple groups, and transfer them separately over different streams, possibly in different transfer modes. However, these streams should all share CANCEL, ERROR, COMPLETE, and REQUEST_N signals, happening on the standard (in-band) transport. The cumulative number of delivered rows (as claimed by Sender in PAYLOAD messages) in this case is assumed to be the minimum such count across all streams. It can pointless to attempt to “splice” variable-size buffers that contain more rows that Sender is going to deliver in the groups in modes 2 and/or 3, so these groups *can* “over-deliver” rows, without the loss of generality. If it needs to, Receiver’s application can keep track of scan pointers in these variable-sized layout buffers and advance them in step with the minimum delivered row count. Note that semantically, this method of treating variable-sized layouts is close to DictionaryBatches, whose sizes are not predictable relative to the sizes of RecordBatches that they prepend or interleave, such as when the whole-stream DictionaryBatch with lots of rows may prepend a *smaller* first RecordBatch. Thus, DictionaryBatches can be transferred over their own separate stream, as well, again without loss of generality. cc @zeroshade @raulcd @bkietz @pitrou @lidavidm ### Component(s) FlightRPC, Format -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org