[ 
https://issues.apache.org/jira/browse/LUCENE-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17498591#comment-17498591
 ] 

Suhan Mao commented on LUCENE-10427:
------------------------------------

[~jpountz] Thanks for your reply!

As I know, the current rollup implementation in ES is to periodically run a 
composite aggregation to query the aggregated result and insert into another 
index.

*But this approach has several disadvantages:*
 # It still need to save the detailed data which may not be needed by the user 
if they only want to run aggregate queries. This is what clickhouse aggregate 
merge tree or druid does currently, they only store aggregated data. But as you 
mentioned, we can create a sidecar index besides the raw data index, I think it 
is also acceptable in the first step.
 # Composite aggregation will cause OOM problem and it maybe very slow if the 
data volume is very big. For example, if the query granularity is 1h, composite 
aggregation will extract all the buckets within one hour from the raw data.
 # Cronjob-like scheduled queries cannot handle late arriving data, if the data 
belonging to previous time interval arrives late after the query, it will be 
ignored in the rollup index which will cause data accuracy issue.
 # From resource consumption perspective, if we must do a merge on segments, 
why not do rollup in the process of merge within one IO round?
 # If the ES rollup granularity is one hour, the latest 1 hour data is not 
visible in the rollup index because the hourly scheduled composite aggregate 
query is not started yet.

*To answer your questions*
 - Q: Different segments would have different granularities
 - A: Different segments within one index all share the same granularity which 
is an index level settings, this granularity is probably the minimum query 
granularity required by the user.
 - Q:merges would no longer combine segments but also perform lossy compression.
 - A:yes. doc count will be heavily reduced after merge and this is as expected 
because smaller data volume will speed up the query performance.
 - Q:all file formats would need to be aware of rollups?
 - A:Currently,I have implemented several formats of docvalues/BKD tree/FST ... 
It is the most commonly used in OLAP scenarios.
 - Q:numeric doc values would need to be able to store multiple fields under 
the hood (min, max, etc.)
 - A:docvalues will not need to store semantics under the hood. We can store 
the information in the index settings. All the supported aggregate operator 
should follow associative property and commutative property. For example, 
max(a,b,c) = max(a, max(b,c)), sum(a, b, c) = sum(a, sum(b,c)), 
hll_union(a,b,c) = hll_union(a, hll_union(b,c)) if data type is binary. So the 
format of the same field in docvalues is always the same. Docvalues cannot tell 
wether a doc is the raw data or aggregated data.

*How we can start from scratch*

I think we can start from a sidecar solution first. Assume that index A is the 
index storing raw data. And index A' is a sidecar index which is a continuous 
rolling up index.

Assume that the schema of index A is:

d0 time, d1 long, d2 keyword, m1 long, m2 long, m3 binary(hll),x1,x2,x3 ......

x1, x2 and x3 fields are no related to rollup and they are just additional 
normal fields.

d0 is the event time, d1 and d2 are all dimensions and m1, m2 and m3 are all 
metrics.

If we want to rollup the data to hourly granularity, we can create a rollup 
sidecar index A' which only contains d0, d1, d2, m1, m2, m3 fields  and do 
rollup during merge process. User can submit query to A or A' accordingly.

What's more, we can create several rollup indices which is often called 
"materialized view" in OLAP scenarios.

For example, if we need another view that only store d0, d1, m3 and rollup 
granularity is daily, we ca create an additional sidecar index A''.

User only need to write raw data once to index A and all the rollup calculation 
is calculated in the internal of lucene. User should submit query to different 
level of indices accordingly.

 

What do you think?

 

 

 

> OLAP likewise rollup during segment merge process
> -------------------------------------------------
>
>                 Key: LUCENE-10427
>                 URL: https://issues.apache.org/jira/browse/LUCENE-10427
>             Project: Lucene - Core
>          Issue Type: New Feature
>            Reporter: Suhan Mao
>            Priority: Major
>
> Currently, many OLAP engines support rollup feature like 
> clickhouse(AggregateMergeTree)/druid. 
> Rollup definition: [https://athena.ecs.csus.edu/~mei/olap/OLAPoperations.php]
> One of the way to do rollup is to merge the same dimension buckets into one 
> and do sum()/min()/max() operation on metric fields during segment 
> compact/merge process. This can significantly reduce the size of the data and 
> speed up the query a lot.
>  
> *Abstraction of how to do*
>  # Define rollup logic: which is dimensions and metrics.
>  # Rollup definition for each metric field: max/min/sum ...
>  # index sorting should the the same as dimension fields.
>  # We will do rollup calculation during segment merge just like other OLAP 
> engine do.
>  
> *Assume the scenario*
> We use ES to ingest realtime raw temperature data every minutes of each 
> sensor device along with many dimension information. User may want to query 
> the data like "what is the max temperature of some device within some/latest 
> hour" or "what is the max temperature of some city within some/latest hour"
> In that way, we can define such fields and rollup definition:
>  # event_hour(round to hour granularity)
>  # device_id(dimension)
>  # city_id(dimension)
>  # temperature(metrics, max/min rollup logic)
> The raw data will periodically be rolled up to the hour granularity during 
> segment merge process, which should save 60x storage ideally in the end.
>  
> *How we do rollup in segment merge*
> bucket: docs should belong to the same bucket if the dimension values are all 
> the same.
>  # For docvalues merge, we send the normal mappedDocId if we encounter a new 
> bucket in DocIDMerger.
>  # Since the index sorting fields are the same with dimension fields. if we 
> encounter more docs in the same bucket, We emit special mappedDocId from 
> DocIDMerger .
>  # In DocValuesConsumer.mergeNumericField, if we meet special mappedDocId, we 
> do a rollup calculation on metric fields and fold the result value to the 
> first doc in the  bucket. The calculation just like a streaming merge sort 
> rollup.
>  # We discard all the special mappedDocId docs because the metrics is already 
> folded to the first doc of in the bucket.
>  # In BKD/posting structure, we discard all the special mappedDocId docs and 
> only place the first doc id within a bucket in the BKD/posting data. It 
> should be simple.
>  
> *How to define the logic*
>  
> {code:java}
> public class RollupMergeConfig {
>   private List<String> dimensionNames;
>   private List<RollupMergeAggregateField> aggregateFields;
> } 
> public class RollupMergeAggregateField {
>   private String name;
>   private RollupMergeAggregateType aggregateType;
> }
> public enum RollupMergeAggregateType {
>   COUNT,
>   SUM,
>   MIN,
>   MAX,
>   CARDINALITY // if data sketch is stored in binary doc values, we can do a 
> union logic 
> }{code}
>  
>  
> I have written the initial code in a basic level. I can submit the complete 
> PR if you think this feature is good to try.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to