This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new acd5d67355 [feature-wip](new-scan)Add new odbc scanner and new odbc scan node (#12899) acd5d67355 is described below commit acd5d67355d71eac97dfcf0362f959b6c5c043dc Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Mon Sep 26 09:24:25 2022 +0800 [feature-wip](new-scan)Add new odbc scanner and new odbc scan node (#12899) --- be/src/exec/exec_node.cpp | 10 +- be/src/runtime/plan_fragment_executor.cpp | 4 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/scan/new_odbc_scan_node.cpp | 62 ++++++++ be/src/vec/exec/scan/new_odbc_scan_node.h | 42 ++++++ be/src/vec/exec/scan/new_odbc_scanner.cpp | 213 ++++++++++++++++++++++++++++ be/src/vec/exec/scan/new_odbc_scanner.h | 65 +++++++++ 7 files changed, 395 insertions(+), 3 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index be0b8690f2..4a9d648d1c 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -62,6 +62,7 @@ #include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vaggregation_node.h" #include "vec/exec/vanalytic_eval_node.h" @@ -448,7 +449,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN #endif case TPlanNodeType::ODBC_SCAN_NODE: if (state->enable_vectorized_exec()) { - *node = pool->add(new vectorized::VOdbcScanNode(pool, tnode, descs)); + if (config::enable_new_scan_node) { + *node = pool->add(new vectorized::NewOdbcScanNode(pool, tnode, descs)); + } else { + *node = pool->add(new vectorized::VOdbcScanNode(pool, tnode, descs)); + } } else { *node = pool->add(new OdbcScanNode(pool, tnode, descs)); } @@ -725,7 +730,8 @@ void ExecNode::try_do_aggregate_serde_improve() { // TODO(cmy): should be removed when NewOlapScanNode is ready ExecNode* child0 = agg_node[0]->_children[0]; if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) || - typeid(*child0) == typeid(vectorized::NewFileScanNode)) { + typeid(*child0) == typeid(vectorized::NewFileScanNode) || + typeid(*child0) == typeid(vectorized::NewOdbcScanNode)) { vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]); scan_node->set_no_agg_finalize(); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index ae5ee8f75a..2fe491a374 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -47,6 +47,7 @@ #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_odbc_scan_node.h" #include "vec/exec/scan/new_olap_scan_node.h" #include "vec/exec/vexchange_node.h" #include "vec/runtime/vdata_stream_mgr.h" @@ -168,7 +169,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, // TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode. ExecNode* node = scan_nodes[i]; if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || - typeid(*node) == typeid(vectorized::NewFileScanNode)) { + typeid(*node) == typeid(vectorized::NewFileScanNode) || + typeid(*node) == typeid(vectorized::NewOdbcScanNode)) { vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]); const std::vector<TScanRangeParams>& scan_ranges = find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 06ce19c014..16eabe1e45 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -253,6 +253,8 @@ set(VEC_FILES exec/scan/new_file_scanner.cpp exec/scan/new_file_text_scanner.cpp exec/scan/vfile_scanner.cpp + exec/scan/new_odbc_scanner.cpp + exec/scan/new_odbc_scan_node.cpp ) add_library(Vec STATIC diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp new file mode 100644 index 0000000000..48043dd22f --- /dev/null +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_odbc_scan_node.h" + +#include "vec/exec/scan/new_odbc_scanner.h" + +static const std::string NEW_SCAN_NODE_TYPE = "NewOdbcScanNode"; + +namespace doris::vectorized { + +NewOdbcScanNode::NewOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs), + _table_name(tnode.jdbc_scan_node.table_name), + _odbc_scan_node(tnode.odbc_scan_node) { + _output_tuple_id = tnode.odbc_scan_node.tuple_id; +} + +std::string NewOdbcScanNode::get_name() { + return fmt::format("VNewOdbcScanNode({0})", _table_name); +} + +Status NewOdbcScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; + RETURN_IF_ERROR(VScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + _scanner_mem_tracker = std::make_unique<MemTracker>("NewOdbcScanner"); + return Status::OK(); +} + +Status NewOdbcScanNode::_init_profile() { + RETURN_IF_ERROR(VScanNode::_init_profile()); + return Status::OK(); +} + +Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) { + if (_eos == true) { + return Status::OK(); + } + NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, _limit_per_scanner, + _scanner_mem_tracker.get(), _odbc_scan_node); + _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(_state)); + scanners->push_back(static_cast<VScanner*>(scanner)); + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h b/be/src/vec/exec/scan/new_odbc_scan_node.h new file mode 100644 index 0000000000..40d2bdd4bd --- /dev/null +++ b/be/src/vec/exec/scan/new_odbc_scan_node.h @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/exec/scan/vscan_node.h" +namespace doris::vectorized { +class NewOdbcScanNode : public VScanNode { +public: + NewOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + Status prepare(RuntimeState* state) override; + + std::string get_name() override; + + // no use + void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override {} + +protected: + Status _init_profile() override; + Status _init_scanners(std::list<VScanner*>* scanners) override; + +private: + std::string _table_name; + TOdbcScanNode _odbc_scan_node; + std::unique_ptr<MemTracker> _scanner_mem_tracker; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp new file mode 100644 index 0000000000..11943d8f04 --- /dev/null +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_odbc_scanner.h" + +#include "common/status.h" +#include "exec/text_converter.hpp" +#include "vec/exec/scan/new_odbc_scan_node.h" +#include "vec/exec/scan/vscanner.h" + +static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner"; + +namespace doris::vectorized { +NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, + MemTracker* mem_tracker, const TOdbcScanNode& odbc_scan_node) + : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker), + _is_init(false), + _odbc_eof(false), + _table_name(odbc_scan_node.table_name), + _connect_string(odbc_scan_node.connect_string), + _query_string(odbc_scan_node.query_string), + _tuple_id(odbc_scan_node.tuple_id), + _tuple_desc(nullptr) {} + +Status NewOdbcScanner::prepare(RuntimeState* state) { + VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; + + if (_is_init) { + return Status::OK(); + } + + if (nullptr == state) { + return Status::InternalError("input pointer is null."); + } + + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + // get tuple desc + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + + if (nullptr == _tuple_desc) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + _odbc_param.connect_string = std::move(_connect_string); + _odbc_param.query_string = std::move(_query_string); + _odbc_param.tuple_desc = _tuple_desc; + + _odbc_connector.reset(new (std::nothrow) ODBCConnector(_odbc_param)); + + if (_odbc_connector == nullptr) { + return Status::InternalError("new a odbc scanner failed."); + } + + _text_converter.reset(new (std::nothrow) TextConverter('\\')); + + if (_text_converter == nullptr) { + return Status::InternalError("new a text convertor failed."); + } + + _is_init = true; + + return Status::OK(); +} + +Status NewOdbcScanner::open(RuntimeState* state) { + VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open"; + + if (nullptr == state) { + return Status::InternalError("input pointer is null."); + } + + if (!_is_init) { + return Status::InternalError("used before initialize."); + } + + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(VScanner::open(state)); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + RETURN_IF_ERROR(_odbc_connector->open()); + RETURN_IF_ERROR(_odbc_connector->query()); + // check materialize slot num + + return Status::OK(); +} + +Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl"; + + if (nullptr == state || nullptr == block || nullptr == eof) { + return Status::InternalError("input is NULL pointer"); + } + + if (!_is_init) { + return Status::InternalError("used before initialize."); + } + RETURN_IF_CANCELLED(state); + + if (_odbc_eof == true) { + *eof = true; + return Status::OK(); + } + + auto column_size = _tuple_desc->slots().size(); + std::vector<MutableColumnPtr> columns(column_size); + + bool mem_reuse = block->mem_reuse(); + // only empty block should be here + DCHECK(block->rows() == 0); + + do { + RETURN_IF_CANCELLED(state); + + columns.resize(column_size); + for (auto i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*block->get_by_position(i).column).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + + for (int row_index = 0; true; row_index++) { + // block is full, break + if (state->batch_size() <= columns[0]->size()) { + break; + } + + RETURN_IF_ERROR(_odbc_connector->get_next_row(&_odbc_eof)); + + if (_odbc_eof == true) { + if (block->rows() == 0) { + *eof = true; + } + break; + } + + // Read one row from reader + for (int column_index = 0, materialized_column_index = 0; column_index < column_size; + ++column_index) { + auto slot_desc = _tuple_desc->slots()[column_index]; + // because the fe planner filter the non_materialize column + if (!slot_desc->is_materialized()) { + continue; + } + const auto& column_data = + _odbc_connector->get_column_data(materialized_column_index); + + char* value_data = static_cast<char*>(column_data.target_value_ptr); + int value_len = column_data.strlen_or_ind; + + if (value_len == SQL_NULL_DATA) { + if (slot_desc->is_nullable()) { + columns[column_index]->insert_default(); + } else { + return Status::InternalError( + "nonnull column contains nullptr. table={}, column={}", _table_name, + slot_desc->col_name()); + } + } else if (value_len > column_data.buffer_length) { + return Status::InternalError( + "column value length longer than buffer length. " + "table={}, column={}, buffer_length", + _table_name, slot_desc->col_name(), column_data.buffer_length); + } else { + if (!_text_converter->write_column(slot_desc, &columns[column_index], + value_data, value_len, true, false)) { + std::stringstream ss; + ss << "Fail to convert odbc value:'" << value_data << "' to " + << slot_desc->type() << " on column:`" << slot_desc->col_name() + "`"; + return Status::InternalError(ss.str()); + } + } + materialized_column_index++; + } + } + + // Before really use the Block, must clear other ptr of column in block + // So here need do std::move and clear in `columns` + if (!mem_reuse) { + int column_index = 0; + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + VLOG_ROW << "VOdbcScanNode output rows: " << block->rows(); + } while (block->rows() == 0 && !(*eof)); + + return Status::OK(); +} + +Status NewOdbcScanner::close(RuntimeState* state) { + RETURN_IF_ERROR(VScanner::close(state)); + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h new file mode 100644 index 0000000000..34cedb8095 --- /dev/null +++ b/be/src/vec/exec/scan/new_odbc_scanner.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/odbc_connector.h" +#include "exec/text_converter.h" +#include "vec/exec/scan/new_odbc_scan_node.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { +class NewOdbcScanner : public VScanner { +public: + NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit, + MemTracker* mem_tracker, const TOdbcScanNode& odbc_scan_node); + + Status open(RuntimeState* state) override; + + // Close the odbc_scanner, and report errors. + Status close(RuntimeState* state) override; + +public: + Status prepare(RuntimeState* state); + +protected: + Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + +private: + bool _is_init; + + // Indicates whether there are more rows to process. Set in _odbc_connector.next(). + bool _odbc_eof; + + std::string _table_name; + + std::string _connect_string; + + std::string _query_string; + // Tuple id resolved in prepare() to set _tuple_desc; + TupleId _tuple_id; + + // Descriptor of tuples read from ODBC table. + const TupleDescriptor* _tuple_desc; + + // Scanner of ODBC. + std::unique_ptr<ODBCConnector> _odbc_connector; + ODBCConnectorParam _odbc_param; + // Helper class for converting text to other types; + std::unique_ptr<TextConverter> _text_converter; +}; +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org