lnbest0707-uber opened a new issue, #16643: URL: https://github.com/apache/pinot/issues/16643
### Justification -- Row vs Column Pinot is a columnar storage system. Data in Pinot is highly compressed based on the schema by columns in each segment. On the other side, Pinot’s real-time ingestion, connecting to the streaming system like Kafka, is consuming data row by row in the `GenericRow` format. The streaming’s row by row message delivery is to ensure low latency. However, many practical use cases, e.g. Observability, do not require milli-second level latency guarantee. The message batching on producer and Kafka layer is very common. The data could, therefore, be more efficiently compressed. But due to the convention of the format, data is still usually in row format like Json and Avro. Hence, there’s inefficiency in data conversion within the whole system. Data needs to flow from 1. Producer generates raw batched data (e.g. in [Otel Observability service](https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/trace/v1/trace.proto#L38)) 2. Producer converts to Avro or Json row and sends to Kafka 3. Kafka implements mini-batch and compress 4. Pinot decodes and ingests the row data 5. Pinot stores the data into the columnar segment To avoid the overhead, the proposal is to bring into [Apache Arrow](https://arrow.apache.org/docs/index.html), a columnar data format specialized in zero memory copy. It is highly adapted by [OpenTelemetry](https://opentelemetry.io/), the rising standard for Observability, e.g. logging, tracing and metrics, tech stacks. ### Ingestion, Goals & Milestones #### Stage 1. New Decoder -> Lower Kafka Data Volume Introduce the Arrow decoder and work with other existing Pinot components. Batch of data would be ingested into Pinot in the same `GenericRow` format and work with other existing transformer, indexing systems. With hundreds millis to 1 second batching interval, and dictionary encoded Arrow data, the final Kafka data volume could decrease with help of the high compression ratio. #### Stage 2. Columnar Transformers -> Lower Ingestion CPU Usage Instead of breaking data into multiple rows and transforming rows one by one. There should be columnar data specific transformers to perform column based transformations. The ingestion CPU usage is expected to be lower by more performant "decoding - transformation" combinations. #### Stage 3. Evolve Mutable Segment -> Reduced Memory Copy Arrow is famous for its zero-copy shared memory. It exposes the further opportunity to remove the data copy during ingestion. Pinot is heavily using DirectBuffer as in-memory forward indexing. The current entire process requires one or multiple times of memory copy to allocate the data. By utilizing Arrow, there's opportunity to decode once and use the same memory copy across the entire lifecycle till flushing to the disk. The CPU and memory usage overhead could be reduced by doing so. #### [Tentative]Stage 4. E2E Arrow Data Format ### Query #### Streaming and Compressed Response Format Pinot mainly returns a whole json as a query response. The data is not compressed at all and cannot be streamlined. For some use cases, the single query response is over 100+MB. This introduces a high networking and IO overhead. Arrow IPC is also a candidate to deliver streaming compressed query responses. Moreover, it could fit better with the Otel Observability systems. The query service on the other side could avoid extra data format conversions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
