This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new acd5d67355 [feature-wip](new-scan)Add new odbc scanner and new odbc 
scan node (#12899)
acd5d67355 is described below

commit acd5d67355d71eac97dfcf0362f959b6c5c043dc
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Mon Sep 26 09:24:25 2022 +0800

    [feature-wip](new-scan)Add new odbc scanner and new odbc scan node (#12899)
---
 be/src/exec/exec_node.cpp                   |  10 +-
 be/src/runtime/plan_fragment_executor.cpp   |   4 +-
 be/src/vec/CMakeLists.txt                   |   2 +
 be/src/vec/exec/scan/new_odbc_scan_node.cpp |  62 ++++++++
 be/src/vec/exec/scan/new_odbc_scan_node.h   |  42 ++++++
 be/src/vec/exec/scan/new_odbc_scanner.cpp   | 213 ++++++++++++++++++++++++++++
 be/src/vec/exec/scan/new_odbc_scanner.h     |  65 +++++++++
 7 files changed, 395 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index be0b8690f2..4a9d648d1c 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -62,6 +62,7 @@
 #include "vec/exec/file_scan_node.h"
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/vaggregation_node.h"
 #include "vec/exec/vanalytic_eval_node.h"
@@ -448,7 +449,11 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
 #endif
     case TPlanNodeType::ODBC_SCAN_NODE:
         if (state->enable_vectorized_exec()) {
-            *node = pool->add(new vectorized::VOdbcScanNode(pool, tnode, 
descs));
+            if (config::enable_new_scan_node) {
+                *node = pool->add(new vectorized::NewOdbcScanNode(pool, tnode, 
descs));
+            } else {
+                *node = pool->add(new vectorized::VOdbcScanNode(pool, tnode, 
descs));
+            }
         } else {
             *node = pool->add(new OdbcScanNode(pool, tnode, descs));
         }
@@ -725,7 +730,8 @@ void ExecNode::try_do_aggregate_serde_improve() {
     // TODO(cmy): should be removed when NewOlapScanNode is ready
     ExecNode* child0 = agg_node[0]->_children[0];
     if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) ||
-        typeid(*child0) == typeid(vectorized::NewFileScanNode)) {
+        typeid(*child0) == typeid(vectorized::NewFileScanNode) ||
+        typeid(*child0) == typeid(vectorized::NewOdbcScanNode)) {
         vectorized::VScanNode* scan_node =
                 static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
         scan_node->set_no_agg_finalize();
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index ae5ee8f75a..2fe491a374 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -47,6 +47,7 @@
 #include "util/uid_util.h"
 #include "vec/core/block.h"
 #include "vec/exec/scan/new_file_scan_node.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
 #include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/vexchange_node.h"
 #include "vec/runtime/vdata_stream_mgr.h"
@@ -168,7 +169,8 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
         // TODO(cmy): this "if...else" should be removed once all ScanNode are 
derived from VScanNode.
         ExecNode* node = scan_nodes[i];
         if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
-            typeid(*node) == typeid(vectorized::NewFileScanNode)) {
+            typeid(*node) == typeid(vectorized::NewFileScanNode) ||
+            typeid(*node) == typeid(vectorized::NewOdbcScanNode)) {
             vectorized::VScanNode* scan_node = 
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
             const std::vector<TScanRangeParams>& scan_ranges =
                     find_with_default(params.per_node_scan_ranges, 
scan_node->id(), no_scan_ranges);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 06ce19c014..16eabe1e45 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -253,6 +253,8 @@ set(VEC_FILES
   exec/scan/new_file_scanner.cpp
   exec/scan/new_file_text_scanner.cpp
   exec/scan/vfile_scanner.cpp
+  exec/scan/new_odbc_scanner.cpp
+  exec/scan/new_odbc_scan_node.cpp
   )
 
 add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp 
b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
new file mode 100644
index 0000000000..48043dd22f
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_odbc_scan_node.h"
+
+#include "vec/exec/scan/new_odbc_scanner.h"
+
+static const std::string NEW_SCAN_NODE_TYPE = "NewOdbcScanNode";
+
+namespace doris::vectorized {
+
+NewOdbcScanNode::NewOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode,
+                                 const DescriptorTbl& descs)
+        : VScanNode(pool, tnode, descs),
+          _table_name(tnode.jdbc_scan_node.table_name),
+          _odbc_scan_node(tnode.odbc_scan_node) {
+    _output_tuple_id = tnode.odbc_scan_node.tuple_id;
+}
+
+std::string NewOdbcScanNode::get_name() {
+    return fmt::format("VNewOdbcScanNode({0})", _table_name);
+}
+
+Status NewOdbcScanNode::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare";
+    RETURN_IF_ERROR(VScanNode::prepare(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+    _scanner_mem_tracker = std::make_unique<MemTracker>("NewOdbcScanner");
+    return Status::OK();
+}
+
+Status NewOdbcScanNode::_init_profile() {
+    RETURN_IF_ERROR(VScanNode::_init_profile());
+    return Status::OK();
+}
+
+Status NewOdbcScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+    if (_eos == true) {
+        return Status::OK();
+    }
+    NewOdbcScanner* scanner = new NewOdbcScanner(_state, this, 
_limit_per_scanner,
+                                                 _scanner_mem_tracker.get(), 
_odbc_scan_node);
+    _scanner_pool.add(scanner);
+    RETURN_IF_ERROR(scanner->prepare(_state));
+    scanners->push_back(static_cast<VScanner*>(scanner));
+    return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h 
b/be/src/vec/exec/scan/new_odbc_scan_node.h
new file mode 100644
index 0000000000..40d2bdd4bd
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.h
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exec/scan/vscan_node.h"
+namespace doris::vectorized {
+class NewOdbcScanNode : public VScanNode {
+public:
+    NewOdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+
+    Status prepare(RuntimeState* state) override;
+
+    std::string get_name() override;
+
+    // no use
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override {}
+
+protected:
+    Status _init_profile() override;
+    Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+    std::string _table_name;
+    TOdbcScanNode _odbc_scan_node;
+    std::unique_ptr<MemTracker> _scanner_mem_tracker;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp 
b/be/src/vec/exec/scan/new_odbc_scanner.cpp
new file mode 100644
index 0000000000..11943d8f04
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_odbc_scanner.h"
+
+#include "common/status.h"
+#include "exec/text_converter.hpp"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+static const std::string NEW_SCANNER_TYPE = "NewOdbcScanner";
+
+namespace doris::vectorized {
+NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, 
int64_t limit,
+                               MemTracker* mem_tracker, const TOdbcScanNode& 
odbc_scan_node)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker),
+          _is_init(false),
+          _odbc_eof(false),
+          _table_name(odbc_scan_node.table_name),
+          _connect_string(odbc_scan_node.connect_string),
+          _query_string(odbc_scan_node.query_string),
+          _tuple_id(odbc_scan_node.tuple_id),
+          _tuple_desc(nullptr) {}
+
+Status NewOdbcScanner::prepare(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare";
+
+    if (_is_init) {
+        return Status::OK();
+    }
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    // get tuple desc
+    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+    if (nullptr == _tuple_desc) {
+        return Status::InternalError("Failed to get tuple descriptor.");
+    }
+
+    _odbc_param.connect_string = std::move(_connect_string);
+    _odbc_param.query_string = std::move(_query_string);
+    _odbc_param.tuple_desc = _tuple_desc;
+
+    _odbc_connector.reset(new (std::nothrow) ODBCConnector(_odbc_param));
+
+    if (_odbc_connector == nullptr) {
+        return Status::InternalError("new a odbc scanner failed.");
+    }
+
+    _text_converter.reset(new (std::nothrow) TextConverter('\\'));
+
+    if (_text_converter == nullptr) {
+        return Status::InternalError("new a text convertor failed.");
+    }
+
+    _is_init = true;
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::open(RuntimeState* state) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open";
+
+    if (nullptr == state) {
+        return Status::InternalError("input pointer is null.");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(VScanner::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+    RETURN_IF_ERROR(_odbc_connector->open());
+    RETURN_IF_ERROR(_odbc_connector->query());
+    // check materialize slot num
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, 
bool* eof) {
+    VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl";
+
+    if (nullptr == state || nullptr == block || nullptr == eof) {
+        return Status::InternalError("input is NULL pointer");
+    }
+
+    if (!_is_init) {
+        return Status::InternalError("used before initialize.");
+    }
+    RETURN_IF_CANCELLED(state);
+
+    if (_odbc_eof == true) {
+        *eof = true;
+        return Status::OK();
+    }
+
+    auto column_size = _tuple_desc->slots().size();
+    std::vector<MutableColumnPtr> columns(column_size);
+
+    bool mem_reuse = block->mem_reuse();
+    // only empty block should be here
+    DCHECK(block->rows() == 0);
+
+    do {
+        RETURN_IF_CANCELLED(state);
+
+        columns.resize(column_size);
+        for (auto i = 0; i < column_size; i++) {
+            if (mem_reuse) {
+                columns[i] = 
std::move(*block->get_by_position(i).column).mutate();
+            } else {
+                columns[i] = 
_tuple_desc->slots()[i]->get_empty_mutable_column();
+            }
+        }
+
+        for (int row_index = 0; true; row_index++) {
+            // block is full, break
+            if (state->batch_size() <= columns[0]->size()) {
+                break;
+            }
+
+            RETURN_IF_ERROR(_odbc_connector->get_next_row(&_odbc_eof));
+
+            if (_odbc_eof == true) {
+                if (block->rows() == 0) {
+                    *eof = true;
+                }
+                break;
+            }
+
+            // Read one row from reader
+            for (int column_index = 0, materialized_column_index = 0; 
column_index < column_size;
+                 ++column_index) {
+                auto slot_desc = _tuple_desc->slots()[column_index];
+                // because the fe planner filter the non_materialize column
+                if (!slot_desc->is_materialized()) {
+                    continue;
+                }
+                const auto& column_data =
+                        
_odbc_connector->get_column_data(materialized_column_index);
+
+                char* value_data = 
static_cast<char*>(column_data.target_value_ptr);
+                int value_len = column_data.strlen_or_ind;
+
+                if (value_len == SQL_NULL_DATA) {
+                    if (slot_desc->is_nullable()) {
+                        columns[column_index]->insert_default();
+                    } else {
+                        return Status::InternalError(
+                                "nonnull column contains nullptr. table={}, 
column={}", _table_name,
+                                slot_desc->col_name());
+                    }
+                } else if (value_len > column_data.buffer_length) {
+                    return Status::InternalError(
+                            "column value length longer than buffer length. "
+                            "table={}, column={}, buffer_length",
+                            _table_name, slot_desc->col_name(), 
column_data.buffer_length);
+                } else {
+                    if (!_text_converter->write_column(slot_desc, 
&columns[column_index],
+                                                       value_data, value_len, 
true, false)) {
+                        std::stringstream ss;
+                        ss << "Fail to convert odbc value:'" << value_data << 
"' to "
+                           << slot_desc->type() << " on column:`" << 
slot_desc->col_name() + "`";
+                        return Status::InternalError(ss.str());
+                    }
+                }
+                materialized_column_index++;
+            }
+        }
+
+        // Before really use the Block, must clear other ptr of column in block
+        // So here need do std::move and clear in `columns`
+        if (!mem_reuse) {
+            int column_index = 0;
+            for (const auto slot_desc : _tuple_desc->slots()) {
+                
block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]),
+                                                    
slot_desc->get_data_type_ptr(),
+                                                    slot_desc->col_name()));
+            }
+        } else {
+            columns.clear();
+        }
+        VLOG_ROW << "VOdbcScanNode output rows: " << block->rows();
+    } while (block->rows() == 0 && !(*eof));
+
+    return Status::OK();
+}
+
+Status NewOdbcScanner::close(RuntimeState* state) {
+    RETURN_IF_ERROR(VScanner::close(state));
+    return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h 
b/be/src/vec/exec/scan/new_odbc_scanner.h
new file mode 100644
index 0000000000..34cedb8095
--- /dev/null
+++ b/be/src/vec/exec/scan/new_odbc_scanner.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/odbc_connector.h"
+#include "exec/text_converter.h"
+#include "vec/exec/scan/new_odbc_scan_node.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+class NewOdbcScanner : public VScanner {
+public:
+    NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int64_t limit,
+                   MemTracker* mem_tracker, const TOdbcScanNode& 
odbc_scan_node);
+
+    Status open(RuntimeState* state) override;
+
+    // Close the odbc_scanner, and report errors.
+    Status close(RuntimeState* state) override;
+
+public:
+    Status prepare(RuntimeState* state);
+
+protected:
+    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
+
+private:
+    bool _is_init;
+
+    // Indicates whether there are more rows to process. Set in 
_odbc_connector.next().
+    bool _odbc_eof;
+
+    std::string _table_name;
+
+    std::string _connect_string;
+
+    std::string _query_string;
+    // Tuple id resolved in prepare() to set _tuple_desc;
+    TupleId _tuple_id;
+
+    // Descriptor of tuples read from ODBC table.
+    const TupleDescriptor* _tuple_desc;
+
+    // Scanner of ODBC.
+    std::unique_ptr<ODBCConnector> _odbc_connector;
+    ODBCConnectorParam _odbc_param;
+    // Helper class for converting text to other types;
+    std::unique_ptr<TextConverter> _text_converter;
+};
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to