HappenLee commented on issue #3438:
URL: 
https://github.com/apache/incubator-doris/issues/3438#issuecomment-625759402


   
   ### About the POC
   At present, only aggregatenode has been verified .
   * 1. First, we do tuple convert columnVector and **time consuming of 
conversion operation is recorded.**
   ```
   {
             Timer time;
             FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, 
in_batch_iter) {
                 // Hoist lookups out of non-null branch to speed up non-null 
case.
                 TupleRow *in_row = in_batch_iter.get();
                 for (int i = 0; i < agg_fn_evals_.size(); ++i) {
                      void** values = static_cast<void 
**>(valuesVector[i].col_data());
                      values[count] = agg_fn_evals_[i]->getAggValue(in_row);
                 }
   
                 for (int i = 0; i < ht_ctx->get_key_size(); ++i) {
                      void** keys = static_cast<void 
**>(keysVector[i].col_data());
                      keys[count] = ht_ctx->get_key(in_row, i);
                 }
                 count++;
             }
             _cost_time += time.stop();
         }
   ```
   * 2. Then we do aggregate use column storage
   ```
           EvalAndHashPrefetchGroup(keysVector, count, ht_ctx, 
remaining_capacity, tuples);
           for (int i = 0; i < agg_fn_evals_.size(); ++i) {
                UpdateTuple(agg_fn_evals_[i], tuples, 
static_cast<void**>(valuesVector[i].col_data()), count);
           }
   ```
   
   * 3. Execute the following query statement in a table of 70 
million。```select max(C_PHONE) from customer group by C_MKTSEGMENT;``` 
   
   Count the time of aggratetation. we can find deducte the time of row 
conversion, **the time cut in half**.
   
      Statistic|origin| row convert to column  | 
     :-:|:-:|:-:
     Time | 4.19 Sec | 2.47 Sec | 
    
   * 4. In this case, the performance is improved through the **execution model 
of the column store**。
      In addition to the aggregatenode, another part of POC is the expression 
calculation and filtering part, such as AND OR filtering calculations whether 
can be improved by vectorization.
   **Join and sort can be ignored in this period, so if the query contains join 
or sort, it needs to be able to identify and use non vectorized execution path 
for execution by query planning.**
   
   ### Implementation and Design Detail
   There is three important parts: **Rewriting of scannode, Rewriting of 
expression calculation, Rewriting of aggregatenode**.
   
   
   #### Rewrite the scanner section first:
   * At present, for agg key and unique key, it is necessary to perform data 
pre aggregation and
   other operations by heap sort. **It seems default to avoid the 
transformation of columns and
   rows. So we only implement vectorized execution in dup_key.** 
   we can use **Rowblockv2 to be an abstraction of the column storage model, it 
is used to transfer data between each node**。we don't need convert **Rowblockv2 
to old Rowblock**.
   * Implement the vectorization of individual expression (For example, the 
first phase can only
   implement simple expressions such as =/</>).
   
   We can make a judgment when querying the plan. If query on dup_key and only 
have simple expressions,
   the vectorization interface will be called. The pseudo code is as follows:
   ```
   VectorizedBatch vbatch = _reader->next_batch_with_aggregation();
   vector<rowId> selectedRowId = Expr.eval(vbatch);
   convertVecBatchToRowBatch(vbatch, selectedRowId, output_rowbatch);
   ```
   **If there are other expressions that are not supported temporarily, 
continue to use the old interface.** The input of new and old interfaces is 
different (one is row, the other is vbatch), but the output is the same (both 
are the original rowbatch of query layer)
   
   * Support more expr calculations, such as IN, FUNCTION and so on. **Finally 
implement all vectorization
   execution in scanner stage.** The output of scanner to the upper layer is 
still the old rowbatch, **but the internal layer and the storage layer is 
vectorized.**
   ![After rewite the 
scanner](https://upload-images.jianshu.io/upload_images/8552201-e1ba42199edb1a47.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
   
   
   #### Rewrite the scannode section second:
   
   * **Rewrite the interface between scan node and scanner to vectorization.** 
The ultimate goal is output of the upper layer of the sctopan node is the old 
rowbatch, but the output of the lower layer of the scan node (scanner and 
storage layer) is vectorized。
   
   So far, the changes of scan part and expr part are basically completed. We 
can rewrite the aggregate node now.
   ![After rewite the 
olap_scan_node](https://upload-images.jianshu.io/upload_images/8552201-213bb59042197868.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
   
   #### Rewrite the aggregatenode section finally:
   * Design and implement a new hash table structure that is compatible with 
the column storage model
   * Rewrite the computing model of the corresponding aggregation operator to 
support the vectorization
   
   * After all of that,**the final goal of the first phase is to vectorize the 
aggregate query of a single table.**
   If there is a join of multiple tables or a sort node, the old execution 
logic is still used. 
   
   
   ![After rewite the 
Aggregation_node](https://upload-images.jianshu.io/upload_images/8552201-54681e865bbd8c1c.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
   
   Later, we can gradually extend the vectorization calculation to other 
execution nodes. **It's going to be a long-term plan.**
   
   #### RelationShip between Rowblock, Rowblockv2, Columnblock
   @kangkaisen 
   * **Rowblock is an old definition. At present, the storage layer and query 
layer still use rowblock interface to pass data.** The query layer accesses the 
data inside rowblock through rowcursor. Data in rowblock is stored
   by row.
   * **Rowblockv2 is a new definition which is vectorized. It contains multiple 
columnblocks.** Each columnblock is a column of data. Rowblockv2 is actually 
the newly defined vectorizedrowbatch.New segment V2 data is read in rowblockv2 
format. The BetaRowsetReader will convert rowblockv2 to rowblock to returns the 
data.
   
   * The reader gets the next line at a time through the internal collect 
iterator. The collect iter will merge all rowsetreaders and return by row. 
**The merger operations here are based on rowcursor and rowblock.So it needs to 
be rewrite reader first.**
   
   * Now Doris has dup_key, agg_key and unique_key storage models. Dup_key does 
not need to do aggregation logic. **So we can first transform this part that 
directly deliver the corresponding data to the upper layer through columnvector 
rather than row_cursor.** At present, for agg key and unique key, it is 
necessary to perform data pre aggregation and other operationsby heap sort. It 
seems default to avoid the transformation of columns and rows.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to