alamb opened a new issue, #23197:
URL: https://github.com/apache/datafusion/issues/23197

   ### Is your feature request related to a problem or challenge?
   
   This ticket tracks various ideas to make evaluating window functions faster 
for single, often large windows. This has come up several times recently wiht 
@avantgardnerio @2010YOUY01 and @wirybeaver among others 
   
   The core use case is to make evaluating a query like this fast:
   ```
   SELECT
       -- moving average over the current row and the 5 previous rows
       AVG(cpu_usage) OVER (ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT 
ROW)  as avg_cpu
   FROM metrics;
   ```
   
   The key property is that the number of rows in individual windows is large 
as there is no `PARTITION BY` in the window clause (`OVER`)
   
   (note a similar thing can happen when there is a `PARTITION BY` clause but 
imbalanced window sizes due to skew or small numbers of partitions)
   
   Today, each window is executed in a single DataFusion partition which means:
   1. They are limited to a single core
   2. They are limited to a single machine in distributed environments
   
   
   # Background
   DataFusion has support for many window functions (see [doc 
link](https://datafusion.apache.org/user-guide/sql/window_functions.html)). 
Window functions are used like this 
   ```sql
   SELECT
       customer, time, cpu_usage,
       -- moving average over the current row and the 5 previous rows
       AVG(cpu_usage) OVER (PARTITION BY customer ORDER BY time ROWS BETWEEN 5 
PRECEDING AND CURRENT ROW)  as avg_cpu
     FROM
       metrics;
   ```
   
   Which results in something like
   ```sql
   +----------+---------------------+-----------+---------+
   | customer | time                | cpu_usage | avg_cpu |
   +----------+---------------------+-----------+---------+
   | acme     | 2026-01-01T00:00:00 | 10.0      | 10.0    |
   | acme     | 2026-01-01T00:01:00 | 20.0      | 15.0    |
   ...
   | globex   | 2026-01-01T00:03:00 | 45.0      | 30.0    |
   | globex   | 2026-01-01T00:04:00 | 55.0      | 35.0    |
   +----------+---------------------+-----------+---------+
   ```
   
   The plan for such a query looks like the following, and typically keeps all 
cores fully occupied
   ```sql
   BoundedWindowExec(...)
     Repartition (on customer) <--- this is responsible for dividing work among 
partitions and thus cores
       Sort(customer, time)
          Scan
   ```
   Here is an example from the tests: 
https://github.com/apache/datafusion/blob/a0e9887550065324320c6fd52001aa23bae67485/datafusion/sqllogictest/test_files/window_topk_pushdown.slt#L112-L118
   
   However, for the case in question, when we remove the `PARTITION BY 
customer` from the `OVER` clause
   
   ```sql
   SELECT
       customer, time, cpu_usage,
       -- moving average over the current row and the 5 previous rows
       AVG(cpu_usage) OVER (ORDER BY time ROWS BETWEEN 5 PRECEDING AND CURRENT 
ROW)  as avg_cpu
     FROM
       metrics;
   ```
   
   The plan looks like this (no `Repartition` and it executes in a single core)
   ```sql
   BoundedWindowExec(...) <-- single partition, single core
       Sort(customer, time) 
          Scan
   ```
   
   Here is an example 
https://github.com/apache/datafusion/blob/a0e9887550065324320c6fd52001aa23bae67485/datafusion/sqllogictest/test_files/window.slt#L4100-L4103
   
   <details><summary>Table definition</summary>
   <p>
   
   ```sql
     CREATE TABLE metrics (
       time TIMESTAMP,
       customer VARCHAR,
       cpu_usage DOUBLE
     );
   
     INSERT INTO metrics VALUES
       (TIMESTAMP '2026-01-01 00:00:00', 'acme', 10.0),
       (TIMESTAMP '2026-01-01 00:01:00', 'acme', 20.0),
       (TIMESTAMP '2026-01-01 00:02:00', 'acme', 30.0),
       (TIMESTAMP '2026-01-01 00:03:00', 'acme', 40.0),
       (TIMESTAMP '2026-01-01 00:04:00', 'acme', 50.0),
       (TIMESTAMP '2026-01-01 00:00:00', 'globex', 15.0),
       (TIMESTAMP '2026-01-01 00:01:00', 'globex', 25.0),
       (TIMESTAMP '2026-01-01 00:02:00', 'globex', 35.0),
       (TIMESTAMP '2026-01-01 00:03:00', 'globex', 45.0),
       (TIMESTAMP '2026-01-01 00:04:00', 'globex', 55.0);
   ```
   
   </p>
   </details> 
   
   
   The performance currently relied 
   
   ### Describe the solution you'd like
   
   * DataFusion can use multiple cores to evaluate window functions for large 
windows
   * Distributed systems such as Ballista can use multiple machines to compute 
the window function results in parallel
   * DataFusion can complete window function queries even when the windows are 
larger than available memory
   
   ### Describe alternatives you've considered
   
   Here are some ideas for improving this usecase:
   1. https://github.com/apache/datafusion/pull/23124 from @Dandandan 
   
   
   ### Additional context
   
     ## Prefix Sums / Prefix Scan
   
     Used for cumulative windows such as `ROWS BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW`.
   
     - Paper: [Prefix Sums and Their 
Applications](https://www.cs.cmu.edu/~guyb/papers/Ble93.pdf)
     - PoC from @avantgardnerio: 
https://github.com/coralogix/arrow-datafusion/pull/426
   
     ## "Halo" Rows / Parallel Bounded Windows
   
     Used for bounded window frames where each output partition needs some rows 
from neighboring ranges. Note I don't think "halo rows" is a standard database 
term -- it means "overlapping partitions with replicated boundary rows"
   
     - Main PoC from @avantgardnerio: 
https://github.com/apache/datafusion/pull/23026
     - Runtime partition extrema design: 
https://github.com/apache/datafusion/issues/23089
     - Runtime partition extrema PR: 
https://github.com/apache/datafusion/pull/23090
     - Dynamic range partitioning design: 
https://github.com/apache/datafusion/issues/23093
     - Dynamic range partitioning PR: 
https://github.com/apache/datafusion/pull/23094
   
     ## WindowAggExec Memory / Spilling
   
     Avoid `WindowAggExec` OOMs by processing partitions incrementally and 
spilling when needed.
   
     - Support spilling for `WindowAggExec`: 
https://github.com/apache/datafusion/issues/22946 
     - POC from @wirybeaver:  https://github.com/apache/datafusion/pull/22947
   
     ## Window Frame Evaluation / Vectorization
   
     Improve the CPU and memory efficiency for window frame boundary 
calculations:
   
     - https://github.com/apache/datafusion/issues/7518
   
   ## Intra operator parallelism
    Alternative/complementary approach: keep the logical window as one 
partition, but parallelize execution inside the operator.
   
      - Paper: [Efficient Processing of Window Functions in Analytical SQL 
Queries](https://www.vldb.org/pvldb/vol8/p1058-leis.pdf), PVLDB 2015
      - General issue from @2010YOUY01: 
https://github.com/apache/datafusion/issues/23174
     - Window-specific issue from @2010YOUY01: 
https://github.com/apache/datafusion/issues/22355
     - PoC from @2010YOUY01: https://github.com/apache/datafusion/pull/22356
     - Related paper on window algorithms: 
https://www.vldb.org/pvldb/vol8/p1058-leis.pdf
   
     ## Adaptive Query Execution / Runtime-Informed Planning
   
     Relevant if DataFusion wants a general framework for runtime stats, 
dynamic split points, repartition choices, skew handling, and similar 
optimizations.
   
     - AQE issue from @avantgardnerio: 
https://github.com/apache/datafusion/issues/23194
     - AQE-lite PoC from @avantgardnerio: 
https://github.com/apache/datafusion/pull/23167
   
     ## Related Sort / Merge Parallelism
   
     Relevant because single-partition windows often sit downstream of sort / 
merge bottlenecks.
   
     - Parallel sort-preserving merge PR from @Dandandan: 
https://github.com/apache/datafusion/pull/23124


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to