This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 17eb8c00d3 [feature] add table valued function framework and numbers table valued function (#10214) 17eb8c00d3 is described below commit 17eb8c00d32737764bacffbb673c4e13e3ab8800 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Tue Jun 28 14:01:57 2022 +0800 [feature] add table valued function framework and numbers table valued function (#10214) --- be/src/exec/exec_node.cpp | 12 + be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/tablefunction/vnumbers_tbf.cpp | 83 +++ be/src/vec/exec/tablefunction/vnumbers_tbf.h | 54 ++ .../tablefunction/vtable_valued_function_inf.h | 55 ++ .../vec/exec/vtable_valued_function_scannode.cpp | 125 ++++ be/src/vec/exec/vtable_valued_function_scannode.h | 73 ++ docs/.vuepress/sidebar/en/docs.js | 3 +- docs/.vuepress/sidebar/zh-CN/docs.js | 3 +- .../sql-functions/string-functions/unhex.md | 2 +- .../sql-functions/table-functions/numbers.md} | 64 +- .../sql-functions/string-functions/unhex.md | 2 +- .../unhex.md => table-functions/numbers.md} | 65 +- fe/fe-core/src/main/cup/sql_parser.cup | 12 + .../java/org/apache/doris/analysis/SelectStmt.java | 4 + .../doris/analysis/TableValuedFunctionRef.java | 77 ++ .../java/org/apache/doris/catalog/TableIf.java | 5 +- .../apache/doris/planner/DistributedPlanner.java | 3 +- .../apache/doris/planner/SingleNodePlanner.java | 13 +- .../doris/planner/TableValuedFunctionScanNode.java | 108 +++ .../main/java/org/apache/doris/qe/Coordinator.java | 8 +- .../apache/doris/statistics/StatisticalType.java | 1 + .../tablefunction/NumbersTableValuedFunction.java | 114 +++ .../tablefunction/TableValuedFunctionInf.java | 51 ++ .../TableValuedFunctionTask.java} | 50 +- gensrc/thrift/PlanNodes.thrift | 22 + .../table_valued_function/test_numbers.out | 808 +++++++++++++++++++++ .../table_valued_function/test_numbers.groovy | 122 ++++ 28 files changed, 1840 insertions(+), 101 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4030b552ac..b2de6bc180 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -79,6 +79,7 @@ #include "vec/exec/vselect_node.h" #include "vec/exec/vsort_node.h" #include "vec/exec/vtable_function_node.h" +#include "vec/exec/vtable_valued_function_scannode.h" #include "vec/exec/vunion_node.h" #include "vec/exprs/vexpr.h" @@ -391,6 +392,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::REPEAT_NODE: case TPlanNodeType::TABLE_FUNCTION_NODE: case TPlanNodeType::BROKER_SCAN_NODE: + case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -577,6 +579,15 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN } return Status::OK(); + case TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE: + if (state->enable_vectorized_exec()) { + *node = pool->add(new vectorized::VTableValuedFunctionScanNode(pool, tnode, descs)); + return Status::OK(); + } else { + error_msg << "numbers table function only support vectorized execution"; + return Status::InternalError(error_msg.str()); + } + default: map<int, const char*>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -652,6 +663,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); + collect_nodes(TPlanNodeType::TABLE_VALUED_FUNCTION_SCAN_NODE, nodes); } void ExecNode::try_do_aggregate_serde_improve() { diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index d6549701b0..ad03c00b81 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -109,6 +109,8 @@ set(VEC_FILES exec/vparquet_scanner.cpp exec/vorc_scanner.cpp exec/join/vhash_join_node.cpp + exec/tablefunction/vnumbers_tbf.cpp + exec/vtable_valued_function_scannode.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp exprs/vexpr.cpp diff --git a/be/src/vec/exec/tablefunction/vnumbers_tbf.cpp b/be/src/vec/exec/tablefunction/vnumbers_tbf.cpp new file mode 100644 index 0000000000..f05848ca61 --- /dev/null +++ b/be/src/vec/exec/tablefunction/vnumbers_tbf.cpp @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/tablefunction/vnumbers_tbf.h" + +#include <sstream> + +#include "exec/exec_node.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/string_value.h" +#include "runtime/tuple_row.h" +#include "util/runtime_profile.h" + +namespace doris::vectorized { + +VNumbersTBF::VNumbersTBF(TupleId tuple_id, const TupleDescriptor* tuple_desc) + : VTableValuedFunctionInf(tuple_id, tuple_desc) {} + +Status VNumbersTBF::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + bool mem_reuse = block->mem_reuse(); + DCHECK(block->rows() == 0); + std::vector<vectorized::MutableColumnPtr> columns(_slot_num); + + do { + for (int i = 0; i < _slot_num; ++i) { + if (mem_reuse) { + columns[i] = std::move(*(block->get_by_position(i).column)).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + while (true) { + RETURN_IF_CANCELLED(state); + int batch_size = state->batch_size(); + if (columns[0]->size() == batch_size) { + // what if batch_size < _total_numbers, should we set *eos? + break; + } + // if _total_numbers == 0, so we can break loop at now. + if (_cur_offset >= _total_numbers) { + *eos = true; + break; + } + columns[0]->insert_data(reinterpret_cast<const char*>(&_cur_offset), + sizeof(_cur_offset)); + ++_cur_offset; + } + auto n_columns = 0; + if (!mem_reuse) { + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + } while (block->rows() == 0 && !(*eos)); + return Status::OK(); +} + +Status VNumbersTBF::set_scan_ranges(const std::vector<TScanRangeParams>& scan_range_params) { + _total_numbers = scan_range_params[0].scan_range.tvf_scan_range.numbers_params.totalNumbers; + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/tablefunction/vnumbers_tbf.h b/be/src/vec/exec/tablefunction/vnumbers_tbf.h new file mode 100644 index 0000000000..c2b5199c85 --- /dev/null +++ b/be/src/vec/exec/tablefunction/vnumbers_tbf.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "runtime/descriptors.h" +#include "vec/exec/tablefunction/vtable_valued_function_inf.h" + +namespace doris { + +class TextConverter; +class Tuple; +class TupleDescriptor; +class RuntimeState; +class MemPool; +class Status; + +namespace vectorized { + +class VNumbersTBF : public VTableValuedFunctionInf { +public: + VNumbersTBF(TupleId tuple_id, const TupleDescriptor* tuple_desc); + ~VNumbersTBF() = default; + + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + +protected: + int64_t _total_numbers; + // Number of returned columns, actually only 1 column + int _slot_num = 1; + int64_t _cur_offset = 0; +}; + +} // namespace vectorized + +} // namespace doris diff --git a/be/src/vec/exec/tablefunction/vtable_valued_function_inf.h b/be/src/vec/exec/tablefunction/vtable_valued_function_inf.h new file mode 100644 index 0000000000..9e2898ffb3 --- /dev/null +++ b/be/src/vec/exec/tablefunction/vtable_valued_function_inf.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "common/global_types.h" +#include "runtime/descriptors.h" +#include "vec/core/block.h" + +namespace doris { + +class RuntimeState; +class Status; + +namespace vectorized { + +class VTableValuedFunctionInf { +public: + VTableValuedFunctionInf(TupleId tuple_id, const TupleDescriptor* tuple_desc) + : _tuple_id(tuple_id), _tuple_desc(tuple_desc) {} + + ~VTableValuedFunctionInf() = default; + + // Should set function parameters in this method + virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0; + virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; + Status close(RuntimeState* state) { return Status::OK(); } + + void set_tuple_desc(const TupleDescriptor* tuple_desc) { _tuple_desc = tuple_desc; } + +protected: + TupleId _tuple_id; + // Descriptor of tuples generated + const TupleDescriptor* _tuple_desc; +}; + +} // namespace vectorized + +} // namespace doris diff --git a/be/src/vec/exec/vtable_valued_function_scannode.cpp b/be/src/vec/exec/vtable_valued_function_scannode.cpp new file mode 100644 index 0000000000..8f810b17c3 --- /dev/null +++ b/be/src/vec/exec/vtable_valued_function_scannode.cpp @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vtable_valued_function_scannode.h" + +#include <sstream> + +#include "common/status.h" +#include "gen_cpp/PlanNodes_types.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/string_value.h" +#include "runtime/tuple_row.h" +#include "util/runtime_profile.h" +#include "vec/exec/tablefunction/vnumbers_tbf.h" + +namespace doris::vectorized { + +VTableValuedFunctionScanNode::VTableValuedFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + _is_init(false), + _tuple_id(tnode.table_valued_func_scan_node.tuple_id), + _tuple_desc(nullptr) { + // set _table_func here + switch (tnode.table_valued_func_scan_node.func_name) { + case TTVFunctionName::NUMBERS: + _table_func = std::make_shared<VNumbersTBF>(_tuple_id, _tuple_desc); + break; + default: + LOG(FATAL) << "Unsupported function type"; + } +} + +Status VTableValuedFunctionScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VTableValuedFunctionScanNode::Prepare"; + + if (_is_init) { + return Status::OK(); + } + + if (nullptr == state) { + return Status::InternalError("input pointer is nullptr."); + } + + RETURN_IF_ERROR(ScanNode::prepare(state)); + // get tuple desc + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + _table_func->set_tuple_desc(_tuple_desc); + + if (nullptr == _tuple_desc) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + _is_init = true; + return Status::OK(); +} + +Status VTableValuedFunctionScanNode::open(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::open(state)); + + if (nullptr == state) { + return Status::InternalError("input pointer is nullptr."); + } + + if (!_is_init) { + return Status::InternalError("used before initialize."); + } + + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); + RETURN_IF_CANCELLED(state); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + + return Status::OK(); +} + +Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { + LOG(FATAL) << "VTableValuedFunctionScanNode only support vectorized execution"; + return Status::OK(); +} + +Status VTableValuedFunctionScanNode::get_next(RuntimeState* state, vectorized::Block* block, + bool* eos) { + if (state == nullptr || block == nullptr || eos == nullptr) { + return Status::InternalError("input is NULL pointer"); + } + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); + RETURN_IF_CANCELLED(state); + Status res = _table_func->get_next(state, block, eos); + RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns())); + reached_limit(block, eos); + return res; +} + +Status VTableValuedFunctionScanNode::close(RuntimeState* state) { + if (is_closed()) { + return Status::OK(); + } + RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); + _table_func->close(state); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + + return ExecNode::close(state); +} + +Status VTableValuedFunctionScanNode::set_scan_ranges( + const std::vector<TScanRangeParams>& scan_ranges) { + return _table_func->set_scan_ranges(scan_ranges); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vtable_valued_function_scannode.h b/be/src/vec/exec/vtable_valued_function_scannode.h new file mode 100644 index 0000000000..9dfca9b63d --- /dev/null +++ b/be/src/vec/exec/vtable_valued_function_scannode.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <memory> + +#include "exec/scan_node.h" +#include "runtime/descriptors.h" +#include "vec/exec/tablefunction/vtable_valued_function_inf.h" + +namespace doris { + +class TextConverter; +class Tuple; +class TupleDescriptor; +class RuntimeState; +class MemPool; +class Status; + +namespace vectorized { + +class VTableValuedFunctionScanNode : public ScanNode { +public: + VTableValuedFunctionScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + ~VTableValuedFunctionScanNode() override = default; + + // initialize _mysql_scanner, and create _text_converter. + Status prepare(RuntimeState* state) override; + + // Start MySQL scan using _mysql_scanner. + Status open(RuntimeState* state) override; + + // Fill the next row batch by calling next() on the _mysql_scanner, + // converting text data in MySQL cells to binary data. + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + + // Close the _mysql_scanner, and report errors. + Status close(RuntimeState* state) override; + + // No use + Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + +protected: + std::shared_ptr<VTableValuedFunctionInf> _table_func; + bool _is_init; + // Tuple id resolved in prepare() to set _tuple_desc; + TupleId _tuple_id; + + // Descriptor of tuples generated + const TupleDescriptor* _tuple_desc; +}; + +} // namespace vectorized + +} // namespace doris diff --git a/docs/.vuepress/sidebar/en/docs.js b/docs/.vuepress/sidebar/en/docs.js index 578f0b426d..0e4b62df86 100644 --- a/docs/.vuepress/sidebar/en/docs.js +++ b/docs/.vuepress/sidebar/en/docs.js @@ -520,7 +520,8 @@ module.exports = [ "explode-bitmap", "explode-split", "explode-json-array", - "outer-combinator" + "outer-combinator", + "numbers" ], }, { diff --git a/docs/.vuepress/sidebar/zh-CN/docs.js b/docs/.vuepress/sidebar/zh-CN/docs.js index d6eeda84fa..d94759c866 100644 --- a/docs/.vuepress/sidebar/zh-CN/docs.js +++ b/docs/.vuepress/sidebar/zh-CN/docs.js @@ -520,7 +520,8 @@ module.exports = [ "explode-bitmap", "explode-split", "explode-json-array", - "outer-combinator" + "outer-combinator", + "numbers" ], }, { diff --git a/docs/en/docs/sql-manual/sql-functions/string-functions/unhex.md b/docs/en/docs/sql-manual/sql-functions/string-functions/unhex.md index 8c0f5266c0..42bd20f762 100644 --- a/docs/en/docs/sql-manual/sql-functions/string-functions/unhex.md +++ b/docs/en/docs/sql-manual/sql-functions/string-functions/unhex.md @@ -31,7 +31,7 @@ under the License. `VARCHAR unhex(VARCHAR str)` Enter a string, if the length of the string is 0 or an odd number, an empty string is returned; -If the string contains characters other than `[0-9], [a-z], [A-Z]`, an empty string is returned; +If the string contains characters other than `[0-9], [a-f], [A-F]`, an empty string is returned; In other cases, every two characters are a group of characters converted into hexadecimal, and then spliced into a string for output. diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md b/docs/en/docs/sql-manual/sql-functions/table-functions/numbers.md similarity index 54% copy from docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md copy to docs/en/docs/sql-manual/sql-functions/table-functions/numbers.md index a8af9f1a8e..ca0f5de915 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/numbers.md @@ -1,11 +1,11 @@ --- { - "title": "unhex", - "language": "zh-CN" + "title": "numbers", + "language": "en" } --- -<!-- +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -24,40 +24,42 @@ specific language governing permissions and limitations under the License. --> -## unhex +## `numbers` + ### description -#### Syntax -`VARCHAR unhex(VARCHAR str)` +Table-Value-Function, generate a temporary table with only one column named 'number', row values are [0,n). -输入字符串,如果字符串长度为0或者为奇数,返回空串; -如果字符串中包含`[0-9]、[a-z]、[A-Z]`之外的字符,返回空串; -其他情况每两个字符为一组转化为16进制后的字符,然后拼接成字符串输出 +This function is used in FROM clauses. +grammar: +``` +FROM numbers(n[,m]); +``` -### example +parameter: +- `n`: It means to generate rows [0, n). +- `m`: Optional parameters. It means this function is executed simultaneously on `m` be nodes (multiple BEs need to be deployed). +### example ``` -mysql> select unhex('@'); -+------------+ -| unhex('@') | -+------------+ -| | -+------------+ - -mysql> select unhex('41'); -+-------------+ -| unhex('41') | -+-------------+ -| A | -+-------------+ - -mysql> select unhex('4142'); -+---------------+ -| unhex('4142') | -+---------------+ -| AB | -+---------------+ +mysql> select * from numbers("5"); ++--------+ +| number | ++--------+ +| 0 | +| 1 | +| 2 | +| 3 | +| 4 | +| 5 | +| 6 | +| 7 | +| 8 | +| 9 | ++--------+ ``` + ### keywords -UNHEX + + numbers \ No newline at end of file diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md b/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md index a8af9f1a8e..28b50e966c 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md @@ -31,7 +31,7 @@ under the License. `VARCHAR unhex(VARCHAR str)` 输入字符串,如果字符串长度为0或者为奇数,返回空串; -如果字符串中包含`[0-9]、[a-z]、[A-Z]`之外的字符,返回空串; +如果字符串中包含`[0-9]、[a-f]、[A-F]`之外的字符,返回空串; 其他情况每两个字符为一组转化为16进制后的字符,然后拼接成字符串输出 diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/numbers.md similarity index 56% copy from docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md copy to docs/zh-CN/docs/sql-manual/sql-functions/table-functions/numbers.md index a8af9f1a8e..60605f7282 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/string-functions/unhex.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/numbers.md @@ -1,11 +1,11 @@ --- { - "title": "unhex", + "title": "numbers", "language": "zh-CN" } --- -<!-- +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -24,40 +24,45 @@ specific language governing permissions and limitations under the License. --> -## unhex +## `numbers` + ### description -#### Syntax -`VARCHAR unhex(VARCHAR str)` +表函数,生成一张只含有一列的临时表,列名为`number`,行的值为[0,n)。 -输入字符串,如果字符串长度为0或者为奇数,返回空串; -如果字符串中包含`[0-9]、[a-z]、[A-Z]`之外的字符,返回空串; -其他情况每两个字符为一组转化为16进制后的字符,然后拼接成字符串输出 +该函数用于from子句中。 +语法: -### example +``` +FROM numbers(n[,m]); +``` + +参数: +- `n`: 代表生成[0,n)的行。 +- `m`: 可选参数,代表`m`个be节点同时执行该函数(需要部署多个be)。 +### example ``` -mysql> select unhex('@'); -+------------+ -| unhex('@') | -+------------+ -| | -+------------+ - -mysql> select unhex('41'); -+-------------+ -| unhex('41') | -+-------------+ -| A | -+-------------+ - -mysql> select unhex('4142'); -+---------------+ -| unhex('4142') | -+---------------+ -| AB | -+---------------+ +mysql> select * from numbers("10"); ++--------+ +| number | ++--------+ +| 0 | +| 1 | +| 2 | +| 3 | +| 4 | +| 5 | +| 6 | +| 7 | +| 8 | +| 9 | ++--------+ ``` + ### keywords -UNHEX + + numbers + + diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 5268236f21..35cf35cf65 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -404,6 +404,7 @@ nonterminal WithClause opt_with_clause; nonterminal ArrayList<View> with_view_def_list; nonterminal View with_view_def; nonterminal Subquery subquery; +nonterminal TableValuedFunctionRef table_valued_function_ref; nonterminal InlineViewRef inline_view_ref; nonterminal JoinOperator join_operator; nonterminal ArrayList<String> opt_plan_hints; @@ -4271,6 +4272,17 @@ table_ref ::= s.setLateralViewRefs(lateralViewRefList); RESULT = s; :} + | table_valued_function_ref:f + {: + RESULT = f; + :} + ; + +table_valued_function_ref ::= + ident:func_name LPAREN string_list:param_list RPAREN opt_table_alias:alias + {: + RESULT = new TableValuedFunctionRef(func_name, alias, param_list); + :} ; inline_view_ref ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 42a3225699..a1f9fd9797 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -299,6 +299,10 @@ public class SelectStmt extends QueryStmt { // Inline view reference QueryStmt inlineStmt = ((InlineViewRef) tblRef).getViewStmt(); inlineStmt.getTables(analyzer, tableMap, parentViewNameSet); + } else if (tblRef instanceof TableValuedFunctionRef) { + TableValuedFunctionRef tblFuncRef = (TableValuedFunctionRef) tblRef; + tableMap.put(tblFuncRef.getTableFunction().getTable().getId(), + tblFuncRef.getTableFunction().getTable()); } else { String dbName = tblRef.getName().getDb(); String tableName = tblRef.getName().getTbl(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java new file mode 100644 index 0000000000..c34c9c8743 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Table; +import org.apache.doris.common.UserException; +import org.apache.doris.tablefunction.TableValuedFunctionInf; + +import java.util.List; + +public class TableValuedFunctionRef extends TableRef { + + private Table table; + private TableValuedFunctionInf tableFunction; + + public TableValuedFunctionRef(String funcName, String alias, List<String> params) throws UserException { + super(new TableName(null, "_table_valued_function_" + funcName), alias); + this.tableFunction = TableValuedFunctionInf.getTableFunction(funcName, params); + if (hasExplicitAlias()) { + return; + } + aliases = new String[] { "_table_valued_function_" + funcName }; + } + + public TableValuedFunctionRef(TableValuedFunctionRef other) { + super(other); + this.tableFunction = other.tableFunction; + } + + @Override + public TableRef clone() { + return new TableValuedFunctionRef(this); + } + + @Override + public TupleDescriptor createTupleDescriptor(Analyzer analyzer) { + TupleDescriptor result = analyzer.getDescTbl().createTupleDescriptor(); + result.setTable(table); + return result; + } + + /** + * Register this table ref and then analyze the Join clause. + */ + @Override + public void analyze(Analyzer analyzer) throws UserException { + if (isAnalyzed) { + return; + } + // Table function could generate a table which will has columns + // Maybe will call be during this process + this.table = tableFunction.getTable(); + desc = analyzer.registerTableRef(this); + isAnalyzed = true; // true that we have assigned desc + analyzeJoin(analyzer); + } + + public TableValuedFunctionInf getTableFunction() { + return tableFunction; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 3802139839..d2f1427e5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -96,7 +96,7 @@ public interface TableIf { * Doris table type. */ public enum TableType { - MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI; + MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, TABLE_VALUED_FUNCTION; public String toEngineName() { switch (this) { @@ -120,6 +120,8 @@ public interface TableIf { return "Hive"; case HUDI: return "Hudi"; + case TABLE_VALUED_FUNCTION: + return "Table_Valued_Function"; default: return null; } @@ -140,6 +142,7 @@ public interface TableIf { case ELASTICSEARCH: case HIVE: case HUDI: + case TABLE_VALUED_FUNCTION: return "EXTERNAL TABLE"; default: return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 46f168ab40..1b58ecfb6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -236,7 +236,6 @@ public class DistributedPlanner { // move 'result' to end, it depends on all of its children fragments.remove(result); fragments.add(result); - if (!isPartitioned && result.isPartitioned() && result.getPlanRoot().getNumInstances() > 1) { result = createMergeFragment(result); fragments.add(result); @@ -276,6 +275,8 @@ public class DistributedPlanner { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED); } else if (node instanceof SchemaScanNode) { return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.UNPARTITIONED); + } else if (node instanceof TableValuedFunctionScanNode) { + return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); } else if (node instanceof OlapScanNode) { // olap scan node OlapScanNode olapScanNode = (OlapScanNode) node; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 38711d025f..3e9a3ae137 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -47,6 +47,7 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TableRef; +import org.apache.doris.analysis.TableValuedFunctionRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.analysis.TupleIsNullPredicate; @@ -1721,6 +1722,10 @@ public class SingleNodePlanner { scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode", null, -1); break; + case TABLE_VALUED_FUNCTION: + scanNode = new TableValuedFunctionScanNode(ctx.getNextNodeId(), tblRef.getDesc(), + "TableValuedFunctionScanNode", ((TableValuedFunctionRef) tblRef).getTableFunction()); + break; default: break; } @@ -1892,7 +1897,7 @@ public class SingleNodePlanner { private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt) throws UserException { PlanNode scanNode = null; - if (tblRef instanceof BaseTableRef) { + if (tblRef instanceof BaseTableRef || tblRef instanceof TableValuedFunctionRef) { scanNode = createScanNode(analyzer, tblRef, selectStmt); } if (tblRef instanceof InlineViewRef) { @@ -2176,8 +2181,8 @@ public class SingleNodePlanner { * @param analyzer */ private void materializeTableResultForCrossJoinOrCountStar(TableRef tblRef, Analyzer analyzer) { - if (tblRef instanceof BaseTableRef) { - materializeSlotForEmptyMaterializedTableRef((BaseTableRef) tblRef, analyzer); + if (tblRef instanceof BaseTableRef || tblRef instanceof TableValuedFunctionRef) { + materializeSlotForEmptyMaterializedTableRef(tblRef, analyzer); } else if (tblRef instanceof InlineViewRef) { materializeInlineViewResultExprForCrossJoinOrCountStar((InlineViewRef) tblRef, analyzer); } else { @@ -2203,7 +2208,7 @@ public class SingleNodePlanner { * @param tblRef * @param analyzer */ - private void materializeSlotForEmptyMaterializedTableRef(BaseTableRef tblRef, Analyzer analyzer) { + private void materializeSlotForEmptyMaterializedTableRef(TableRef tblRef, Analyzer analyzer) { if (tblRef.getDesc().getMaterializedSlots().isEmpty()) { Column minimuColumn = null; for (Column col : tblRef.getTable().getBaseSchema()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TableValuedFunctionScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TableValuedFunctionScanNode.java new file mode 100644 index 0000000000..af2c26dfca --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TableValuedFunctionScanNode.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.tablefunction.TableValuedFunctionInf; +import org.apache.doris.tablefunction.TableValuedFunctionTask; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TTableValuedFunctionScanNode; + +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * This scan node is used for table valued function. + */ +public class TableValuedFunctionScanNode extends ScanNode { + private static final Logger LOG = LogManager.getLogger(TableValuedFunctionScanNode.class.getName()); + + private List<TScanRangeLocations> shardScanRanges; + private TableValuedFunctionInf tvf; + private boolean isFinalized = false; + + public TableValuedFunctionScanNode(PlanNodeId id, TupleDescriptor desc, + String planNodeName, TableValuedFunctionInf tvf) { + super(id, desc, planNodeName, StatisticalType.TABLE_VALUED_FUNCTION_NODE); + this.tvf = tvf; + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + computeStats(analyzer); + } + + @Override + public int getNumInstances() { + return shardScanRanges.size(); + } + + @Override + public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) { + return shardScanRanges; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + if (isFinalized) { + return; + } + try { + shardScanRanges = getShardLocations(); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + + isFinalized = true; + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.TABLE_VALUED_FUNCTION_SCAN_NODE; + TTableValuedFunctionScanNode tvfScanNode = new TTableValuedFunctionScanNode(); + tvfScanNode.setTupleId(desc.getId().asInt()); + tvfScanNode.setFuncName(tvf.getFuncName()); + msg.table_valued_func_scan_node = tvfScanNode; + } + + private List<TScanRangeLocations> getShardLocations() throws AnalysisException { + List<TScanRangeLocations> result = Lists.newArrayList(); + for (TableValuedFunctionTask task : tvf.getTasks()) { + TScanRangeLocations locations = new TScanRangeLocations(); + TScanRangeLocation location = new TScanRangeLocation(); + location.setBackendId(task.getBackend().getId()); + location.setServer(new TNetworkAddress(task.getBackend().getHost(), task.getBackend().getBePort())); + locations.addToLocations(location); + locations.setScanRange(task.getExecParams()); + result.add(locations); + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8585872c84..2ef4ec58fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -577,9 +577,13 @@ public class Coordinator { int profileFragmentId = 0; long memoryLimit = queryOptions.getMemLimit(); Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap(); - // If #fragments >=3, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, + // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, // else use exec_plan_fragments directly. - boolean twoPhaseExecution = fragments.size() >= 3; + // we choose #fragments >=2 because in some cases + // we need ensure that A fragment is already prepared to receive data before B fragment sends data. + // For example: select * from numbers("10","w") will generate ExchangeNode and TableValuedFunctionScanNode, + // we should ensure TableValuedFunctionScanNode does not send data until ExchangeNode is ready to receive. + boolean twoPhaseExecution = fragments.size() >= 2; for (PlanFragment fragment : fragments) { FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 990b5042f3..cc806f7b3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -44,4 +44,5 @@ public enum StatisticalType { STREAM_LOAD_SCAN_NODE, TABLE_FUNCTION_NODE, UNION_NODE, + TABLE_VALUED_FUNCTION_NODE, } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java new file mode 100644 index 0000000000..5ef482ab2d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TTVFNumbersScanRange; +import org.apache.doris.thrift.TTVFScanRange; +import org.apache.doris.thrift.TTVFunctionName; + +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +// Table function that generate int64 numbers +// have a single column number + +/** + * The Implement of table valued function——numbers(N,M). + */ +public class NumbersTableValuedFunction extends TableValuedFunctionInf { + public static final String NAME = "numbers"; + private static final Logger LOG = LogManager.getLogger(PlanNode.class); + // The total numbers will be generated. + private long totalNumbers; + // The total backends will server it. + private int tabletsNum; + + /** + * Constructor. + * @param params params from user + * @throws UserException exception + */ + public NumbersTableValuedFunction(List<String> params) throws UserException { + if (params.size() < 1 || params.size() > 2) { + throw new UserException( + "numbers table function only support numbers(10000 /*total numbers*/)" + + "or numbers(10000, 2 /*number of tablets to run*/)"); + } + totalNumbers = Long.parseLong(params.get(0)); + // default tabletsNum is 1. + tabletsNum = 1; + if (params.size() == 2) { + tabletsNum = Integer.parseInt(params.get(1)); + } + } + + @Override + public TTVFunctionName getFuncName() { + return TTVFunctionName.NUMBERS; + } + + @Override + public String getTableName() { + return "NumbersTableValuedFunction"; + } + + @Override + public List<Column> getTableColumns() { + List<Column> resColumns = new ArrayList<>(); + resColumns.add(new Column("number", PrimitiveType.BIGINT, false)); + return resColumns; + } + + @Override + public List<TableValuedFunctionTask> getTasks() throws AnalysisException { + List<Backend> backendList = Lists.newArrayList(); + for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + backendList.add(be); + } + } + if (backendList.isEmpty()) { + throw new AnalysisException("No Alive backends"); + } + Collections.shuffle(backendList); + List<TableValuedFunctionTask> res = Lists.newArrayList(); + for (int i = 0; i < tabletsNum; ++i) { + TScanRange scanRange = new TScanRange(); + TTVFScanRange tvfScanRange = new TTVFScanRange(); + TTVFNumbersScanRange tvfNumbersScanRange = new TTVFNumbersScanRange(); + tvfNumbersScanRange.setTotalNumbers(totalNumbers); + tvfScanRange.setNumbersParams(tvfNumbersScanRange); + scanRange.setTvfScanRange(tvfScanRange); + res.add(new TableValuedFunctionTask(backendList.get(i % backendList.size()), scanRange)); + } + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionInf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionInf.java new file mode 100644 index 0000000000..17a81e7787 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionInf.java @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TTVFunctionName; + +import java.util.List; + +public abstract class TableValuedFunctionInf { + + public abstract TTVFunctionName getFuncName(); + + public Table getTable() { + Table table = new Table(-1, getTableName(), TableIf.TableType.TABLE_VALUED_FUNCTION, getTableColumns()); + return table; + } + + // All table functions should be registered here + public static TableValuedFunctionInf getTableFunction(String funcName, List<String> params) throws UserException { + if (funcName.equalsIgnoreCase(NumbersTableValuedFunction.NAME)) { + return new NumbersTableValuedFunction(params); + } + throw new UserException("Could not find table function " + funcName); + } + + public abstract String getTableName(); + + public abstract List<Column> getTableColumns(); + + public abstract List<TableValuedFunctionTask> getTasks() throws AnalysisException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionTask.java similarity index 57% copy from fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java copy to fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionTask.java index 990b5042f3..b535b2b1a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionTask.java @@ -15,33 +15,27 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.statistics; +package org.apache.doris.tablefunction; -public enum StatisticalType { - DEFAULT, - AGG_NODE, - ANALYTIC_EVAL_NODE, - ASSERT_NUM_ROWS_NODE, - BROKER_SCAN_NODE, - CROSS_JOIN_NODE, - EMPTY_SET_NODE, - ES_SCAN_NODE, - EXCEPT_NODE, - EXCHANGE_NODE, - HASH_JOIN_NODE, - HIVE_SCAN_NODE, - ICEBERG_SCAN_NODE, - INTERSECT_NODE, - LOAD_SCAN_NODE, - MYSQL_SCAN_NODE, - ODBC_SCAN_NODE, - OLAP_SCAN_NODE, - REPEAT_NODE, - SELECT_NODE, - SET_OPERATION_NODE, - SCHEMA_SCAN_NODE, - SORT_NODE, - STREAM_LOAD_SCAN_NODE, - TABLE_FUNCTION_NODE, - UNION_NODE, +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TScanRange; + +public class TableValuedFunctionTask { + // Expected running backend + private Backend backend; + // Function running parameters + private TScanRange execParams; + + public TableValuedFunctionTask(Backend backend, TScanRange execParams) { + this.backend = backend; + this.execParams = execParams; + } + + public Backend getBackend() { + return backend; + } + + public TScanRange getExecParams() { + return execParams; + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index fb3a87fc4a..4dcddcfd1e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -52,6 +52,7 @@ enum TPlanNodeType { EXCEPT_NODE, ODBC_SCAN_NODE, TABLE_FUNCTION_NODE, + TABLE_VALUED_FUNCTION_SCAN_NODE, } // phases of an execution node @@ -222,6 +223,20 @@ struct TExternalScanRange { // TODO: add more scan range type? } +enum TTVFunctionName { + NUMBERS = 0, +} + +// Every table valued function should have a scan range definition to save its +// running parameters +struct TTVFNumbersScanRange { + 1: optional i64 totalNumbers +} + +struct TTVFScanRange { + 1: optional TTVFNumbersScanRange numbers_params +} + // Specification of an individual data range which is held in its entirety // by a storage server struct TScanRange { @@ -231,6 +246,7 @@ struct TScanRange { 6: optional TBrokerScanRange broker_scan_range 7: optional TEsScanRange es_scan_range 8: optional TExternalScanRange ext_scan_range + 9: optional TTVFScanRange tvf_scan_range } struct TMySQLScanNode { @@ -757,6 +773,11 @@ struct TRuntimeFilterDesc { 9: optional i64 bloom_filter_size_bytes } +struct TTableValuedFunctionScanNode { + 1: optional Types.TTupleId tuple_id + 2: optional TTVFunctionName func_name +} + // This is essentially a union of all messages corresponding to subclasses // of PlanNode. struct TPlanNode { @@ -808,6 +829,7 @@ struct TPlanNode { // output column 42: optional list<Types.TSlotId> output_slot_ids + 43: optional TTableValuedFunctionScanNode table_valued_func_scan_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first diff --git a/regression-test/data/correctness/table_valued_function/test_numbers.out b/regression-test/data/correctness/table_valued_function/test_numbers.out new file mode 100644 index 0000000000..be43ffc8ce --- /dev/null +++ b/regression-test/data/correctness/table_valued_function/test_numbers.out @@ -0,0 +1,808 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !basic1 -- +0 + +-- !basic2 -- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +-- !basic3 -- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +30 +31 +32 +33 +34 +35 +36 +37 +38 +39 +40 +41 +42 +43 +44 +45 +46 +47 +48 +49 +50 +51 +52 +53 +54 +55 +56 +57 +58 +59 +60 +61 +62 +63 +64 +65 +66 +67 +68 +69 +70 +71 +72 +73 +74 +75 +76 +77 +78 +79 +80 +81 +82 +83 +84 +85 +86 +87 +88 +89 +90 +91 +92 +93 +94 +95 +96 +97 +98 +99 + +-- !basic4_limit -- +0 +1 +2 +3 +4 + +-- !agg_sum -- +4950 + +-- !agg_avg -- +49.5 + +-- !agg_count -- +100 + +-- !agg_min -- +0 + +-- !agg_max -- +99 + +-- !inner_join1 -- +0 0 +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 +7 7 +8 8 +9 9 + +-- !inner_join2 -- +1 0 +2 0 +2 1 +3 0 +3 1 +3 2 +4 0 +4 1 +4 2 +4 3 +5 0 +5 1 +5 2 +5 3 +5 4 + +-- !inner_join3 -- +0 0 +2 2 +4 4 +6 6 +8 8 + +-- !left_join -- +0 0 +1 1 +2 2 +3 3 +4 4 +5 \N +6 \N +7 \N +8 \N +9 \N + +-- !right_join -- +0 0 +1 1 +2 2 +3 3 +4 4 +\N 7 +\N 9 +\N 6 +\N 8 +\N 5 + +-- !where_equal -- +1 +3 +5 +7 +9 + +-- !where_gt -- +3 +4 +5 +6 +7 +8 +9 + +-- !where_lt -- +0 +1 +2 +3 +4 +5 +6 +7 + +-- !groupby -- +7 +9 +6 +8 +5 +4 + +-- !join_where -- +5 5 +6 6 +7 7 +8 8 +9 9 + +-- !subquery1 -- +1 + +-- !subquery2 -- +6 +7 +8 +9 + +-- !subquery3 -- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +-- !window_1 -- +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 +10 9 + +-- !window_2 -- +0 1 +1 2 +2 3 +3 4 +4 5 +5 6 +6 7 +7 8 +8 9 +9 10 + +-- !window_3 -- +0 1 +1 2 +2 3 +3 4 +4 5 +5 6 +6 7 +7 8 +8 9 +9 10 + +-- !window_4 -- +0 1 +1 3 +2 6 +3 9 +4 12 +5 15 +6 18 +7 21 +8 24 +9 17 + +-- !window_5 -- +0 0 +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 + +-- !window_6 -- +0 0 +1 0 +2 0 +3 0 +4 0 +5 0 +6 0 +7 0 +8 0 +9 0 + +-- !window_7 -- +0 1 +1 2 +2 3 +3 4 +4 5 +5 6 +6 7 +7 8 +8 9 +9 9 + +-- !window_8 -- +0 1 +1 2 +2 3 +3 4 +4 5 +5 6 +6 7 +7 8 +8 9 +9 9 + +-- !window_9 -- +0 0.5 +1 1.0 +2 2.0 +3 3.0 +4 4.0 +5 5.0 +6 6.0 +7 7.0 +8 8.0 +9 8.5 + +-- !window_10 -- +0 2 +1 3 +2 3 +3 3 +4 3 +5 3 +6 3 +7 3 +8 3 +9 2 + +-- !window_11 -- +0 0 +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 + +-- !window_12 -- +0 1 +1 2 +2 3 +3 4 +4 5 +5 6 +6 7 +7 8 +8 9 +9 9 + +-- !window_13 -- +0 -1 +1 -1 +2 0 +3 1 +4 2 +5 3 +6 4 +7 5 +8 6 +9 7 + +-- !stringfunction_1 -- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +-- !stringfunction_2 -- +0a +1a +2a +3a +4a +5a +6a +7a +8a +9a + +-- !stringfunction_3 -- +0abcd +1abcd +2abcd +3abcd +4abcd +5abcd +6abcd +7abcd +8abcd +9abcd + +-- !stringfunction_4 -- +00 +11 +22 +33 +44 +55 +66 +77 +88 +99 + +-- !stringfunction_5 -- +48 +49 +50 +51 +52 +53 +54 +55 +56 +57 +49 +49 + +-- !stringfunction_6 -- +8 +8 +8 +8 +16 +16 +16 +16 + +-- !stringfunction_7 -- +1 +1 +1 +1 +2 +2 +2 +2 + +-- !stringfunction_8 -- +6-a +7-a +8-a +9-a +10-a +11-a +12-a +13-a + +-- !stringfunction_9 -- +0 false +1 true +2 false +3 false +4 false +5 false +6 false +7 false +8 false +9 false +10 false +11 true + +-- !stringfunction_10 -- +0 1 +1 2 +2 3 +3 4 +4 5 +5 6 +6 7 +7 8 +8 0 +9 0 + +-- !stringfunction_11 -- +6 6 +7 7 +8 8 +9 9 +10 A +11 B +12 C + +-- !stringfunction_12 -- +6 36 +7 37 +8 38 +9 39 +10 3130 +11 3131 +12 3132 + +-- !stringfunction_13 -- +6 0 +7 0 +8 0 +9 0 +10 1 +11 1 +12 1 + +-- !stringfunction_14 -- +121 12 +122 12 +123 12 +124 12 +125 12 +126 12 +127 12 +128 12 +129 12 +130 13 + +-- !stringfunction_15 -- +121 3 +122 3 +123 3 +124 3 +125 3 +126 3 +127 3 +128 3 +129 3 +130 3 + +-- !stringfunction_16 -- +121 2 +122 2 +123 2 +124 2 +125 2 +126 2 +127 2 +128 2 +129 2 +130 0 + +-- !stringfunction_17 -- +121 0 +122 3 +123 0 +124 0 +125 0 +126 0 +127 0 +128 0 +129 0 +130 0 + +-- !stringfunction_18 -- +96 096 +97 097 +98 098 +99 099 +100 100 +101 101 +102 102 +103 103 +104 104 +105 105 +106 106 +107 107 +108 108 +109 109 +110 110 + +-- !stringfunction_19 -- +a0 +a1 +a2 +a3 +a4 +a5 +a6 +a7 +a8 +a9 + +-- !stringfunction_20 -- +00 +11 +22 +33 +44 +55 +66 +77 +88 +99 +1010 +1111 +1212 + +-- !stringfunction_21 -- +0 +a +2 +3 +4 +5 +6 +7 +8 +9 +a0 +aa +a2 + +-- !stringfunction_22 -- +01 +11 +21 +31 +41 +51 +61 +71 +81 +91 + +-- !stringfunction_23 -- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 + +-- !stringfunction_24 -- +96 960 +97 970 +98 980 +99 990 +100 100 +101 101 +102 102 +103 103 +104 104 +105 105 +106 106 +107 107 +108 108 +109 109 +110 110 + +-- !stringfunction_25 -- +false +true +false +false +false +false +false +false +false +false +true +true +true +true +true + +-- !stringfunction_26 -- +10 +10 +10 +10 +11 +11 +11 +11 +11 +11 + +-- !stringfunction_27 -- +06 +07 +08 +09 +10 +11 +12 +13 +14 +15 + +-- !stringfunction_28 -- +06 +07 +08 +09 +10 +11 +12 +13 +14 +15 + +-- !stringfunction_29 -- +6 +7 +8 +9 +0 +1 +2 +3 +4 +5 + +-- !stringfunction_30 -- +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 +21 ! +22 " +23 # +24 $ +25 % +26 & +27 ' +28 ( +29 ) + diff --git a/regression-test/suites/correctness/table_valued_function/test_numbers.groovy b/regression-test/suites/correctness/table_valued_function/test_numbers.groovy new file mode 100644 index 0000000000..32f85ba62c --- /dev/null +++ b/regression-test/suites/correctness/table_valued_function/test_numbers.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one + // or more contributor license agreements. See the NOTICE file + // distributed with this work for additional information + // regarding copyright ownership. The ASF licenses this file + // to you under the Apache License, Version 2.0 (the + // "License"); you may not use this file except in compliance + // with the License. You may obtain a copy of the License at + // + // http://www.apache.org/licenses/LICENSE-2.0 + // + // Unless required by applicable law or agreed to in writing, + // software distributed under the License is distributed on an + // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + // KIND, either express or implied. See the License for the + // specific language governing permissions and limitations + // under the License. + + + suite("test_numbers", "correctness") { + // Test basic features + qt_basic1 """ select * from numbers("1"); """ + qt_basic2 """ select * from numbers("10"); """ + qt_basic3 """ select * from numbers("100"); """ + qt_basic4_limit """ select * from numbers("10") limit 5; """ + + // Test aggregate function withh numbers(N) + qt_agg_sum """ select sum(number) from numbers("100"); """ + qt_agg_avg """ select avg(number) from numbers("100"); """ + qt_agg_count """ select count(*) from numbers("100"); """ + qt_agg_min """ select min(number) from numbers("100"); """ + qt_agg_max """ select max(number) from numbers("100"); """ + + // Test join with numbers(N) + qt_inner_join1 """ + select a.number as num1, b.number as num2 + from numbers("10") a inner join numbers("10") b + on a.number=b.number; + """ + qt_inner_join2 """ + select a.number as num1, b.number as num2 + from numbers("6") a inner join numbers("6") b + on a.number>b.number; + """ + qt_inner_join3 """ + select a.number as num1, b.number as num2 + from numbers("10") a inner join numbers("10") b + on a.number=b.number and b.number%2 = 0; + """ + qt_left_join """ + select a.number as num1, b.number as num2 + from numbers("10") a left join numbers("5") b + on a.number=b.number; + """ + qt_right_join """ + select a.number as num1, b.number as num2 + from numbers("5") a right join numbers("10") b + on a.number=b.number; + """ + + // Test where and GroupBy + qt_where_equal """ select * from numbers("10") where number%2 = 1; """ + qt_where_gt """ select * from numbers("10") where number-1 > 1; """ + qt_where_lt """ select * from numbers("10") where number+1 < 9; """ + qt_groupby """ select number from numbers("10") where number>=4 group by number; """ + qt_join_where """ + select a.number as num1, b.number as num2 + from numbers("10") a inner join numbers("10") b + on a.number=b.number where a.number>4; + """ + + // Test Sub Query + qt_subquery1 """ select * from numbers("10") where number = (select number from numbers("10") where number=1); """ + qt_subquery2 """ select * from numbers("10") where number in (select number from numbers("10") where number>5); """ + qt_subquery3 """ select a.number from numbers("10") a where number in (select number from numbers("10") b where a.number=b.number); """ + + // Test window function + qt_window_1 """ SELECT row_number() OVER (ORDER BY number) AS id,number from numbers("10"); """ + qt_window_2 """ SELECT number, rank() OVER (order by number) AS sum_three from numbers("10"); """ + qt_window_3 """ SELECT number, dense_rank() OVER (order by number) AS sum_three from numbers("10"); """ + qt_window_4 """ SELECT number, sum(number) OVER (ORDER BY number rows between 1 preceding and 1 following) AS result from numbers("10"); """ + qt_window_5 """ SELECT number, min(number) OVER (ORDER BY number rows between 1 PRECEDING and 1 following) AS result from numbers("10"); """ + qt_window_6 """ SELECT number, min(number) OVER (ORDER BY number rows between UNBOUNDED PRECEDING and 1 following) AS result from numbers("10"); """ + qt_window_7 """ SELECT number, max(number) OVER (ORDER BY number rows between 1 preceding and 1 following) AS result from numbers("10"); """ + qt_window_8 """ SELECT number, max(number) OVER (ORDER BY number rows between UNBOUNDED PRECEDING and 1 following) AS result from numbers("10"); """ + qt_window_9 """ SELECT number, avg(number) OVER (ORDER BY number rows between 1 preceding and 1 following) AS result from numbers("10"); """ + qt_window_10 """ SELECT number, count(number) OVER (ORDER BY number rows between 1 preceding and 1 following) AS result from numbers("10"); """ + qt_window_11 """ SELECT number, first_value(number) OVER (ORDER BY number rows between 1 preceding and 1 following) AS result from numbers("10"); """ + qt_window_12 """ SELECT number, last_value(number) OVER (ORDER BY number rows between 1 preceding and 1 following) AS result from numbers("10"); """ + qt_window_13 """ SELECT number, LAG(number,2,-1) OVER (ORDER BY number) AS result from numbers("10"); """ + + // Cast BITINT to STRING and test string function. + qt_stringfunction_1 """ select cast (number as string) as string_num from numbers("10"); """ + qt_stringfunction_2 """ select append_trailing_char_if_absent(cast (number as string),'a') as string_fucntion_res from numbers("10"); """ + qt_stringfunction_3 """ select concat(cast (number as string),'abc','d') as string_fucntion_res from numbers("10"); """ + qt_stringfunction_4 """ select concat(cast (number as string), cast (number as string)) as string_fucntion_res from numbers("10"); """ + qt_stringfunction_5 """ select ascii(cast (number as string)) as string_fucntion_res from numbers("12"); """ + qt_stringfunction_6 """ select bit_length(cast (number as string)) as string_fucntion_res from numbers("14") where number>5; """ + qt_stringfunction_7 """ select char_length(cast (number as string)) as string_fucntion_res from numbers("14") where number>5; """ + qt_stringfunction_8 """ select concat_ws('-',cast (number as string),'a') as string_fucntion_res from numbers("14") where number>5; """ + qt_stringfunction_9 """ select number, ends_with(cast (number as string),'1') as string_fucntion_res from numbers("12"); """ + qt_stringfunction_10 """ select number,find_in_set(cast (number as string),'0,1,2,3,4,5,6,7') as string_fucntion_res from numbers("10"); """ + qt_stringfunction_11 """ select number,hex(number) as string_fucntion_res from numbers("13") where number>5; """ + qt_stringfunction_12 """ select number,hex(cast (number as string)) as string_fucntion_res from numbers("13") where number>5; """ + qt_stringfunction_13 """ select number,instr(cast (number as string),'1') as string_fucntion_res from numbers("13") where number>5; """ + qt_stringfunction_14 """ select number,left(cast (number as string),'2') as string_fucntion_res from numbers("1000") where number>120 limit 10; """ + qt_stringfunction_15 """ select number,length(cast (number as string)) as string_fucntion_res from numbers("1000") where number>120 limit 10; """ + qt_stringfunction_16 """ select number,locate('2',cast (number as string)) as string_fucntion_res from numbers("1000") where number>120 limit 10; """ + qt_stringfunction_17 """ select number,locate('2',cast (number as string),3) as string_fucntion_res from numbers("1000") where number>120 limit 10; """ + qt_stringfunction_18 """ select number,lpad(cast (number as string),3,'0') as string_fucntion_res from numbers("1000") where number>95 limit 15; """ + qt_stringfunction_19 """ select ltrim( concat(' a',cast (number as string))) as string_fucntion_res from numbers("10"); """ + qt_stringfunction_20 """ select repeat(cast (number as string),2) as string_fucntion_res from numbers("13"); """ + qt_stringfunction_21 """ select replace(cast (number as string),'1','a') as string_fucntion_res from numbers("13"); """ + qt_stringfunction_22 """ select reverse(cast (number as string)) as string_fucntion_res from numbers("20") where number>9; """ + qt_stringfunction_23 """ select right(cast (number as string),1) as string_fucntion_res from numbers("20") where number>9; """ + qt_stringfunction_24 """ select number,rpad(cast (number as string),3,'0') as string_fucntion_res from numbers("1000") where number>95 limit 15; """ + qt_stringfunction_25 """ select STARTS_WITH(cast (number as string),'1') as string_fucntion_res from numbers("15"); """ + qt_stringfunction_26 """ select strleft(cast (number as string),'2') as string_fucntion_res from numbers("200") where number>105 limit 10; """ + qt_stringfunction_27 """ select strright(cast (number as string),'2') as string_fucntion_res from numbers("1000") where number>105 limit 10; """ + qt_stringfunction_28 """ select substring(cast (number as string),2) as string_fucntion_res from numbers("1000") where number>105 limit 10; """ + qt_stringfunction_29 """ select substring(cast (number as string),-1) as string_fucntion_res from numbers("1000") where number>105 limit 10; """ + qt_stringfunction_30 """ select number,unhex(cast (number as string)) as string_fucntion_res from numbers("100") limit 30; """ + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org