[ 
https://issues.apache.org/jira/browse/LUCENE-10427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suhan Mao updated LUCENE-10427:
-------------------------------
    Description: 
Currently, many OLAP engines support rollup feature like 
clickhouse(AggregateMergeTree)/druid. 

Rollup definition: [https://athena.ecs.csus.edu/~mei/olap/OLAPoperations.php]

One of the way to do rollup is to merge the same dimension buckets and do 
sum()/min()/max() operation on metric fields during segment compact/merge 
process. This can significantly reduce the size of the data and speed up the 
query a lot.

 

*Abstraction of how to do*
 # Define rollup logic: which is dimensions and metrics.
 # Rollup definition for each metric field: max/min/sum ...
 # index sorting should the the same as dimension fields.
 # We will do rollup calculation during segment merge just like other OLAP 
engine do.

 

*Assume the scenario*

If we use ES to ingest realtime raw temperature data every minutes of each 
sensor device along with many dimension information. User may want to query the 
latest data like "what is the max temperature of some device within some/latest 
hour" or "what is the max temperature of some city(dimension) within 
some/latest hour"

In that way, we can define such fields and rollup definition:
 # event_hour(round to hour granularity)
 # device_id(dimension)
 # city_id(dimension)
 # temperature(metrics, max/min rollup logic)

The raw data will periodically be rolled up to the hour granularity during 
segment merge process, which should save 60x storage ideally in the end.

For each sensor device, within one hour, 60 rows will be merged into 1 row 
ideally.

 

*How we do rollup in segment merge*

bucket: docs should belong to the same bucket if the dimension values are all 
the same.
 # For docvalues merge, we send the normal mappedDocId if we encounter a new 
bucket in DocIDMerger.
 # Since the index sorting fields are the same with dimension fields. if we 
encounter more docs in the same bucket, We emit special mappedDocId from 
DocIDMerger .
 # In DocValuesConsumer.mergeNumericField, if we meet special mappedDocId, we 
do a rollup calculation on metric fields and fold the result value to the first 
doc in the  bucket. The calculation just like a streaming merge sort rollup.
 # We discard all the special mappedDocId docs because the metrics is already 
folded to the first doc of in the bucket.
 # In BKD/posting index structure, we discard all the special mappedDocId docs 
and no rollup calculation is needed. It should be simple.

 

*How to define the logic*

 
{code:java}
public class RollupMergeConfig {
  private List<String> dimensionNames;
  private List<RollupMergeAggregateField> aggregateFields;
} 

public class RollupMergeAggregateField {
  private String name;
  private RollupMergeAggregateType aggregateType;
}

public enum RollupMergeAggregateType {
  COUNT,
  SUM,
  MIN,
  MAX,
  CARDINALITY // if data sketch is stored in binary doc values, we can do a 
union logic 
}{code}
 

 

I have written the initial code in a basic level. I can submit the complete PR 
if you think this feature is good to try.

 

 

 

 

  was:
Currently, many OLAP engines support rollup feature like 
clickhouse(AggregateMergeTree)/druid. 

Rollup definition: [https://athena.ecs.csus.edu/~mei/olap/OLAPoperations.php]

One of the way to do rollup is to merge the same dimension fields and do 
sum()/min()/max() operation on metric fields. This can significantly reduce the 
size of the data and speed up the query a lot.

 

*Abstraction of how to do*
 # Define rollup logic: which is dimensions and metrics.
 # Rollup definition for each metric field: max/min/sum ...
 # index sorting should the the same as dimension fields.
 # We will do rollup calculation during segment merge just like other OLAP 
engine do.

 

*Assume the scenario*

If we use ES to ingest realtime raw temperature data every minutes of each 
sensor device along with many dimension information. User may want to query the 
latest data like "what is the max temperature of some device within some/latest 
hour" or "what is the max temperature of some city(dimension) within 
some/latest hour"

In that way, we can define such fields and rollup definition:
 # event_hour(round to hour granularity)
 # device_id(dimension)
 # city_id(dimension)
 # temperature(metrics, max/min rollup logic)

The raw data will periodically be rolled up to the hour granularity during 
segment merge process, which should save 60x storage ideally in the end.

For each sensor device, within one hour, 60 rows will be merged into 1 row 
ideally.

 

*How we do rollup in segment merge*

bucket: docs should belong to the same bucket if the dimension values are all 
the same.
 # For docvalues merge, we send the normal mappedDocId if we encounter a new 
bucket in DocIDMerger.
 # Since the index sorting fields are the same with dimension fields. if we 
encounter more docs in the same bucket, We emit special mappedDocId from 
DocIDMerger .
 # In DocValuesConsumer.mergeNumericField, if we meet special mappedDocId, we 
do a rollup calculation on metric fields and fold the result value to the first 
doc in the  bucket. The calculation just like a streaming merge sort rollup.
 # We discard all the special mappedDocId docs because the metrics is already 
folded to the first doc of in the bucket.
 # In BKD/posting index structure, we discard all the special mappedDocId docs 
and no rollup calculation is needed. It should be simple.

 

*How to define the logic*

 
{code:java}
public class RollupMergeConfig {
  private List<String> dimensionNames;
  private List<RollupMergeAggregateField> aggregateFields;
} 

public class RollupMergeAggregateField {
  private String name;
  private RollupMergeAggregateType aggregateType;
}

public enum RollupMergeAggregateType {
  COUNT,
  SUM,
  MIN,
  MAX,
  CARDINALITY // if data sketch is stored in binary doc values, we can do a 
union logic 
}{code}
 

 

I have written the initial code in a basic level. I can submit the complete PR 
if you think this feature is good to try.

 

 

 

 


> OLAP likewise rollup during segment merge process
> -------------------------------------------------
>
>                 Key: LUCENE-10427
>                 URL: https://issues.apache.org/jira/browse/LUCENE-10427
>             Project: Lucene - Core
>          Issue Type: New Feature
>            Reporter: Suhan Mao
>            Priority: Major
>
> Currently, many OLAP engines support rollup feature like 
> clickhouse(AggregateMergeTree)/druid. 
> Rollup definition: [https://athena.ecs.csus.edu/~mei/olap/OLAPoperations.php]
> One of the way to do rollup is to merge the same dimension buckets and do 
> sum()/min()/max() operation on metric fields during segment compact/merge 
> process. This can significantly reduce the size of the data and speed up the 
> query a lot.
>  
> *Abstraction of how to do*
>  # Define rollup logic: which is dimensions and metrics.
>  # Rollup definition for each metric field: max/min/sum ...
>  # index sorting should the the same as dimension fields.
>  # We will do rollup calculation during segment merge just like other OLAP 
> engine do.
>  
> *Assume the scenario*
> If we use ES to ingest realtime raw temperature data every minutes of each 
> sensor device along with many dimension information. User may want to query 
> the latest data like "what is the max temperature of some device within 
> some/latest hour" or "what is the max temperature of some city(dimension) 
> within some/latest hour"
> In that way, we can define such fields and rollup definition:
>  # event_hour(round to hour granularity)
>  # device_id(dimension)
>  # city_id(dimension)
>  # temperature(metrics, max/min rollup logic)
> The raw data will periodically be rolled up to the hour granularity during 
> segment merge process, which should save 60x storage ideally in the end.
> For each sensor device, within one hour, 60 rows will be merged into 1 row 
> ideally.
>  
> *How we do rollup in segment merge*
> bucket: docs should belong to the same bucket if the dimension values are all 
> the same.
>  # For docvalues merge, we send the normal mappedDocId if we encounter a new 
> bucket in DocIDMerger.
>  # Since the index sorting fields are the same with dimension fields. if we 
> encounter more docs in the same bucket, We emit special mappedDocId from 
> DocIDMerger .
>  # In DocValuesConsumer.mergeNumericField, if we meet special mappedDocId, we 
> do a rollup calculation on metric fields and fold the result value to the 
> first doc in the  bucket. The calculation just like a streaming merge sort 
> rollup.
>  # We discard all the special mappedDocId docs because the metrics is already 
> folded to the first doc of in the bucket.
>  # In BKD/posting index structure, we discard all the special mappedDocId 
> docs and no rollup calculation is needed. It should be simple.
>  
> *How to define the logic*
>  
> {code:java}
> public class RollupMergeConfig {
>   private List<String> dimensionNames;
>   private List<RollupMergeAggregateField> aggregateFields;
> } 
> public class RollupMergeAggregateField {
>   private String name;
>   private RollupMergeAggregateType aggregateType;
> }
> public enum RollupMergeAggregateType {
>   COUNT,
>   SUM,
>   MIN,
>   MAX,
>   CARDINALITY // if data sketch is stored in binary doc values, we can do a 
> union logic 
> }{code}
>  
>  
> I have written the initial code in a basic level. I can submit the complete 
> PR if you think this feature is good to try.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to