Hi Mingyu 陈明雨 <morning...@163.com> 于2019年10月8日周二 上午10:17写道:
> Hi ZhaoChun: > > > I have some questions: > > > 1. How does the TableFunctionScanNode works with other ScanNode? > > TableFunctionScanNode works like other ScanNode, who don't depend on other ExecNode. Because it generates data for its parent node. What TableFunctionScanNode do is prepare arguments for table function, execute the function and return the result to it parent. So there is nothing special for TableFunctionScanNode work with other ExecNode. > > For example: select * from B left join lateral from (select * from B where > B.c1 > A.c1) > > > In this SQL, What ScanNodes will be generated for the subquery in FROM > clause? A single TableFunctionScanNode? Or a OlapScanNode follows by a > TableFunctionScanNode? > > In your example, LATERAL JOIN should be implemented in other place not TableFunctionScanNode. Maybe a LateralJoinNode implement the coordination work between TableFunctionScanNode and OlapScanNode. Or we should introduce RESCAN in execution layer. However this is not included in this proposal. When we want to implement it, We will design this in more detail in other proposal. > 2. Can TableFunctionScanNode replace some implementations in GROUPING SET? > > I'm afraid not. They have different objective. TableFunctionScanNode's objective is to execute table functions, but GROUPING SET not. TableFunctionScanNode is more general, because GROUPING SET may be implemented through TableFunctionScanNode and SQL rewrite. Thanks, Zhao Chun -- > 此致!Best Regards > 陈明雨 Mingyu Chen > > Email: > chenmin...@apache.org > > > > At 2019-09-30 18:00:34, "Zhao Chun" <zh...@apache.org> wrote: > >Hi, all > > > >I want to support UDTF(user-defined table function) in Doris. Following is > >detailed design. > > > >## Motivation > > > >Currently in some scenarios, users have the need to expand a row of data > >into multiple rows. For example, if a user's field stores an TAG list, > then > >if he want to count the number of occurrences of each tag. Like following > >example, if he want to count how many times each tag appears. > > > >| id | tags | > >| --- | --- | > >| 1 | tagA, tagC | > >| 2 | tagA, tagB | > >| 3 | tagB, tagD | > >| 4 | tagE, tagF | > > > >If we can expand the above table according to the tags as follows, it's > >easy to get how many each tag appears by a simple SQL like `SELECT > count(*) > >from tbl GROUP BY tag`. > > > >| id | tag | > >| --- | --- | > >| 1 | tagA > >| 1 | tagC | > >| 2 | tagA | > >| 2 | tagB | > >| 3 | tagB | > >| 3 | tagD | > >| 4 | tagE | > >| 4 | tagF | > > > >However, Doris lacks similar functionality, so it is very difficult and > >inefficient to achieve this functionality. > > > >In addition to enabling UDTF to make it easier for Doris users to analyze > >data to achieve their desired goals, Doris can use UDTF to achieve better > >display and analysis of nested type data in the future. Such as we can > >expand a JSON field to rows like following. > > > >We can change > > > >``` > >{"a": "foo", "b": "bar"} > >``` > > > >into rows like > > > >| a | b | > >| --- | --- | > >| foo | bar | > > > >or rows like > > > >| key | value | > >| --- | --- | > >| a | foo | > >| b | bar | > > > >## Summary > > > >And this work may be split into 3 phases to finish. > > > >1. Implement analysis and execution of table function. After this work > >finishes, user can use built-in table function. However we don't support > >lateral join then, table function's argument can only be constant, which > >can not change in one query. So the ability of table function is still > very > >limited. > >2. Support for user defined table function. After this work, developers > can > >create new table functions to solve his own problem. > >3. Support lateral join. After this work, table function will be more > >complete, user can use it to solve most problems. But how to do this > >function is not completely clear, so it is to do. > > > >## Phase 1: Support Built-in Table Function > > > >### Query Plan > > > >In order to support user to use table function, we should support such > >syntax. Now Doris supports table reference and subquery in SQL FROM > clause, > >table function should also be supported in FROM clause. And SQL analyzer > >must be able to identify table function and check if it's used in a proper > >way. For example table function can not be used in SQL WHERE and GROUP BY > >clause. > > > >When analyzer analyzes table function's usage, SQL Planner should generate > >execution plan for table function. This execution plan should be scheduled > >by coordinator to some backend to execute it. > > > >### Table Function Execution > > > >To make table function execute in backend, we should implement a new > >`ExecNode` whose name is `TableFuncionScanNode`. Its responsibility is to > >execute a type of table functions and collect result of table functions > and > >then provide them to data pipeline of this execution instance. > > > >This node's main includes: first, it should prepare the environment and > >parameters which are required by the TableFunction. This node will compute > >these all before each function call. This node may be executed multiple > >times because parameters are changed according to input value. In order to > >achieve this, we need to support `ReOpen` or `ReScan` interface to start > >scan again with different parameters. We will postpone this function util > >we support later join. Second, this node will call table function to > >execute it and collect function's result. Third, this function will > convert > >the result of function to what other `ExecNode` can recognize and handle. > >Parent of this node get data from it in a batch way, this node should > >return result in this way to make other nodes happy. > > > >### UDTF Execution > > > >In order to ensure the consistency of the user experience, we continue to > >use the UDF implementation interface. To implement a table function, user > >should implement three functions as follow. `process` interface must be > >provided for a table function, the other two interfaces `prepare` and > >`close` are optional. These interfaces are used to optimize execution of > >function. For example, some context can be created in prepare function and > >saved in ctx, which can be used in every process call. It will avoid > >creating context for each call of process. > > > >``` > >Void func_prepare(doris_udf::FunctionContext* ctx, > >doris_udf::FunctionContext::FunctionStateScope scope) > >Void func_process(doris_udf::FunctionContext* ctx, const > doris_udf::xxxVal& > >arg1 [, ...]) > >Void fun_close(doris_udf::FunctionContext* ctx, > >doris_udf::FunctionContext::FunctionStateScope scope) > >``` > > > >Unlike other functions, the table function returns multiple rows of data > >for a single call. This version assumes that the number of rows returned > by > >each call function will not be too large, such as less than 10000 rows. So > >the use of a `process` function to fully obtain the data processed by the > >function. If there is a large amount of data returned by the function in > >the future, then we can add the incremental access interface such as > >`get_next()`. If this interface is supported, result of function will be > >get by calling `get_next` many times. > > > >The data generated by the `process` function is stored in `RecordStore`, > >which can be obtained by `ctx->record_store()`. The function result used > >memory should be allocated from `RecordStore`. > > > >The specific interface corresponding to `RecordStore` is defined as > follows. > > > >``` > >Class RecordStore { > >Public: > > // Allocate a record to store data. > > // Returned record can be added to this store through calling > > // append_record function. If returned record is not added back, > > // client should call free_record to free it. > > Record* allocate_record(); > > > > // Append a record to this store. The input record must be returned > > // by allocate_record function of this RecordStore. Otherwise > > // undefined error would happen. > > void append_record(Record* record); > > > > // This function is to free the unused record created by > allocate_record > > // function. > > void free_record(Record* record); > > > > // Allocate memory for variable length filed in record, such as string > > // type. The allocated memory need not to be freed by client, they > will > > // be freed when this store is destroyed. > > void* allocate(size_t size); > >Private: > > RecordStoreImpl* _impl; > >}; > >``` > > > >The interface of each Record is provided as follows. > > > >``` > >Class Record { > >Public: > > // Set idx field to null > > void set_null(int idx); > > > > // set idx filed to val as int > > void set_int(int idx, int val); > > > > // set idx filed to ptr with len as string, this function will > > // use input buffer directly without copy. Client should allocate > > // memory from RecordStore. > > void set_string(int idx, const uint8_t* ptr, int len); > >}; > >``` > > > >Below is a simple example to implement a table function. > > > >``` > >// duplicate input argument many times. All results will be save > >// in RecordStore. > >void duplicate(FunctionContext* context, const IntVal& times, const > >StringVal& val) { > > RecordStore* store = context->record_store(); > > for (int i = 0; i < times.val; ++i) { > > Record* record = store->allocate_record(); > > > > // set index > > record.set_int(0, i); > > > > // set value > > char* ptr = store->allocate(val.len); > > memcpy(ptr, val.ptr, val.len); > > record.set_string(1, ptr, val.len); > > > > store->append_record(record); > > } > >} > >``` > > > >The implementation framework requires two sets of code to be implemented, > >one for Doris internal and the other for SDK. Why do we need two sets of > >code to implement? Simple reason is because of execution performance. I > >want that Table function's result can be processed by other execution > nodes > >directly without memory copy. However, in order to ensure the decoupling > >relationship between the table function and the internal implementation, > it > >is not necessary to expose the internal implementation to the users of the > >UDTF. This ensures future changes in the internal format and ensures that > >the existing UDTF can continue to be used normally. So we need to > implement > >it two times to separate the SDK from the internal implementation of Doris > >internal. > > > >After above work, we can support built-in table function in Doris. > > > >## Phase 2: Support User-Defined Table Function > > > >## SQL Syntax > > > >The syntax to create a UDTF is as follows. Compared to UDF, UDTF also > needs > >to define a function to return the schema of Table, the other is the same > >as UDF. > > > >``` > >CREATE FUNCTION func_name (arg_type [, arg_type]) > >RETURNS TABLE (col_name col_type [, ...]) > >[PROPERTIES ("key" = "value" [, ...]) ] > >``` > > > >## Meta Store > > > >Internally, you need to save UDTF-related metadata. > > > >## Phase 3: Support Lateral Join > > > >After Supporting Layer Join, TableFunction can be called multiple times > >during a function execution, using the following methods: > > > >``` > >SELECT * from tbl as a, LATERAL table_func(a.c1, a.c2); > >``` > > > >What we should support is as follows. > > > >1. Query parsing supports Lateral syntax > >2. The planning part can correctly plan Lateral Join > >3. Support Lateral Join Node or parameter system supporting the entire > query > > > >And How to achieve this is TODO... > > > >If you have any thoughts on this, please let me know. Looking forward to > >your feedback. > > > >I have already create an issue[1] in GitHub too. > > > >1. https://github.com/apache/incubator-doris/issues/1922 > > > >Thanks, > >Zhao Chun >