This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 935ef5a598 [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027 935ef5a598 is described below commit 935ef5a59848f0b98c0c13d57f969ea3f60067f9 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Mon Oct 10 09:56:38 2022 +0800 [feature-wip](new-scan) Add new ES scanner and new ES scan node #13027 --- be/src/exec/exec_node.cpp | 10 +- be/src/runtime/plan_fragment_executor.cpp | 4 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/exec/scan/new_es_scan_node.cpp | 245 ++++++++++++++++++++++++++++++ be/src/vec/exec/scan/new_es_scan_node.h | 70 +++++++++ be/src/vec/exec/scan/new_es_scanner.cpp | 200 ++++++++++++++++++++++++ be/src/vec/exec/scan/new_es_scanner.h | 67 ++++++++ be/src/vec/exec/scan/new_odbc_scanner.cpp | 2 +- 8 files changed, 596 insertions(+), 4 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index a70db3f510..39e917ec6b 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -61,6 +61,7 @@ #include "vec/core/block.h" #include "vec/exec/file_scan_node.h" #include "vec/exec/join/vhash_join_node.h" +#include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" @@ -477,7 +478,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::ES_HTTP_SCAN_NODE: if (state->enable_vectorized_exec()) { - *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs)); + if (config::enable_new_scan_node) { + *node = pool->add(new vectorized::NewEsScanNode(pool, tnode, descs)); + } else { + *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs)); + } } else { *node = pool->add(new EsHttpScanNode(pool, tnode, descs)); } @@ -735,7 +740,8 @@ void ExecNode::try_do_aggregate_serde_improve() { ExecNode* child0 = agg_node[0]->_children[0]; if (typeid(*child0) == typeid(vectorized::NewOlapScanNode) || typeid(*child0) == typeid(vectorized::NewFileScanNode) || - typeid(*child0) == typeid(vectorized::NewOdbcScanNode) + typeid(*child0) == typeid(vectorized::NewOdbcScanNode) || + typeid(*child0) == typeid(vectorized::NewEsScanNode) #ifdef LIBJVM || typeid(*child0) == typeid(vectorized::NewJdbcScanNode) #endif diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index cff6630c23..14baf8ae06 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -46,6 +46,7 @@ #include "util/telemetry/telemetry.h" #include "util/uid_util.h" #include "vec/core/block.h" +#include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/new_odbc_scan_node.h" @@ -171,7 +172,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, ExecNode* node = scan_nodes[i]; if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || typeid(*node) == typeid(vectorized::NewFileScanNode) || - typeid(*node) == typeid(vectorized::NewOdbcScanNode) + typeid(*node) == typeid(vectorized::NewOdbcScanNode) || + typeid(*node) == typeid(vectorized::NewEsScanNode) #ifdef LIBJVM || typeid(*node) == typeid(vectorized::NewJdbcScanNode) #endif diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 87900dcd0b..d91eec109f 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -257,6 +257,8 @@ set(VEC_FILES exec/scan/new_odbc_scan_node.cpp exec/scan/new_jdbc_scanner.cpp exec/scan/new_jdbc_scan_node.cpp + exec/scan/new_es_scanner.cpp + exec/scan/new_es_scan_node.cpp ) add_library(Vec STATIC diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp new file mode 100644 index 0000000000..31e439281d --- /dev/null +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_es_scan_node.h" + +#include "exec/es/es_query_builder.h" +#include "exec/es/es_scroll_query.h" +#include "vec/exec/scan/new_es_scanner.h" +#include "vec/utils/util.hpp" + +static const std::string NEW_SCAN_NODE_TYPE = "NewEsScanNode"; + +// Prefer to the local host +static std::string get_host_port(const std::vector<doris::TNetworkAddress>& es_hosts) { + std::string host_port; + std::string localhost = doris::BackendOptions::get_localhost(); + + doris::TNetworkAddress host = es_hosts[0]; + for (auto& es_host : es_hosts) { + if (es_host.hostname == localhost) { + host = es_host; + break; + } + } + + host_port = host.hostname; + host_port += ":"; + host_port += std::to_string(host.port); + return host_port; +} + +namespace doris::vectorized { + +NewEsScanNode::NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs), + _tuple_id(tnode.es_scan_node.tuple_id), + _tuple_desc(nullptr), + _scanner_mem_tracker(nullptr), + _es_profile(nullptr) { + _output_tuple_id = tnode.es_scan_node.tuple_id; +} + +std::string NewEsScanNode::get_name() { + return fmt::format("VNewEsScanNode"); +} + +Status NewEsScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::init(tnode, state)); + + // use TEsScanNode + _properties = tnode.es_scan_node.properties; + + if (tnode.es_scan_node.__isset.docvalue_context) { + _docvalue_context = tnode.es_scan_node.docvalue_context; + } + + if (tnode.es_scan_node.__isset.fields_context) { + _fields_context = tnode.es_scan_node.fields_context; + } + return Status::OK(); +} + +Status NewEsScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << NEW_SCAN_NODE_TYPE << "::prepare"; + RETURN_IF_ERROR(VScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + _scanner_mem_tracker = std::make_unique<MemTracker>("NewEsScanner"); + + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + return Status::InternalError("Failed to get tuple descriptor, _tuple_id=i{}", _tuple_id); + } + + // set up column name vector for ESScrollQueryBuilder + for (auto slot_desc : _tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + _column_names.push_back(slot_desc->col_name()); + } + + return Status::OK(); +} + +void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { + for (auto& es_scan_range : scan_ranges) { + DCHECK(es_scan_range.scan_range.__isset.es_scan_range); + _scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range)); + } +} + +Status NewEsScanNode::_init_profile() { + RETURN_IF_ERROR(VScanNode::_init_profile()); + _es_profile.reset(new RuntimeProfile("EsIterator")); + _scanner_profile->add_child(_es_profile.get(), true, nullptr); + + _rows_read_counter = ADD_COUNTER(_es_profile, "RowsRead", TUnit::UNIT); + _read_timer = ADD_TIMER(_es_profile, "TotalRawReadTime(*)"); + _materialize_timer = ADD_TIMER(_es_profile, "MaterializeTupleTime(*)"); + return Status::OK(); +} + +Status NewEsScanNode::_process_conjuncts() { + RETURN_IF_ERROR(VScanNode::_process_conjuncts()); + if (_eos) { + return Status::OK(); + } + + // fe by enable_new_es_dsl to control whether to generate DSL for easy rollback. After the code is stable, can delete the be generation logic + if (_properties.find(ESScanReader::KEY_QUERY_DSL) != _properties.end()) { + return Status::OK(); + } + + // if conjunct is constant, compute direct and set eos = true + for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { + if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { + void* value = _conjunct_ctxs[conj_idx]->get_value(nullptr); + if (value == nullptr || *reinterpret_cast<bool*>(value) == false) { + _eos = true; + } + } + } + RETURN_IF_ERROR(build_conjuncts_list()); + // remove those predicates which ES cannot support + std::vector<bool> list; + BooleanQueryBuilder::validate(_predicates, &list); + + DCHECK(list.size() == _predicate_to_conjunct.size()); + for (int i = list.size() - 1; i >= 0; i--) { + if (!list[i]) { + _predicate_to_conjunct.erase(_predicate_to_conjunct.begin() + i); + _predicates.erase(_predicates.begin() + i); + } + } + + // filter the conjuncts and ES will process them later + for (int i = _predicate_to_conjunct.size() - 1; i >= 0; i--) { + int conjunct_index = _predicate_to_conjunct[i]; + _conjunct_ctxs[conjunct_index]->close(_state); + _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index); + } + + auto checker = [&](int index) { + return _conjunct_to_predicate[index] != -1 && list[_conjunct_to_predicate[index]]; + }; + + // _peel_pushed_vconjunct + if (_vconjunct_ctx_ptr == nullptr) { + return Status::OK(); + } + int leaf_index = 0; + vectorized::VExpr* conjunct_expr_root = (*_vconjunct_ctx_ptr)->root(); + if (conjunct_expr_root != nullptr) { + vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct( + _state, *_vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker); + if (new_conjunct_expr_root == nullptr) { + (*_vconjunct_ctx_ptr)->close(_state); + _vconjunct_ctx_ptr.reset(nullptr); + } else { + (*_vconjunct_ctx_ptr)->set_root(new_conjunct_expr_root); + } + } + return Status::OK(); +} + +Status NewEsScanNode::_init_scanners(std::list<VScanner*>* scanners) { + if (_scan_ranges.empty()) { + _eos = true; + return Status::OK(); + } + + for (auto& es_scan_range : _scan_ranges) { + // Collect the information from scan range to properties + std::map<std::string, std::string> properties(_properties); + properties[ESScanReader::KEY_INDEX] = es_scan_range->index; + if (es_scan_range->__isset.type) { + properties[ESScanReader::KEY_TYPE] = es_scan_range->type; + } + properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range->shard_id); + properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_state->batch_size()); + properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range->es_hosts); + // push down limit to Elasticsearch + // if predicate in _conjunct_ctxs can not be processed by Elasticsearch, we can not push down limit operator to Elasticsearch + if (limit() != -1 && limit() <= _state->batch_size() && _conjunct_ctxs.empty()) { + properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(limit()); + } + + bool doc_value_mode = false; + properties[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build( + properties, _column_names, _predicates, _docvalue_context, &doc_value_mode); + + NewEsScanner* scanner = + new NewEsScanner(_state, this, _limit_per_scanner, _mem_tracker.get(), _tuple_id, + properties, _docvalue_context, doc_value_mode); + + _scanner_pool.add(scanner); + RETURN_IF_ERROR(scanner->prepare(_state)); + scanners->push_back(static_cast<VScanner*>(scanner)); + } + return Status::OK(); +} + +// build predicate +Status NewEsScanNode::build_conjuncts_list() { + Status status = Status::OK(); + _conjunct_to_predicate.resize(_conjunct_ctxs.size()); + + for (int i = 0; i < _conjunct_ctxs.size(); ++i) { + EsPredicate* predicate = _pool->add(new EsPredicate(_conjunct_ctxs[i], _tuple_desc, _pool)); + predicate->set_field_context(_fields_context); + status = predicate->build_disjuncts_list(); + if (status.ok()) { + _conjunct_to_predicate[i] = _predicate_to_conjunct.size(); + _predicate_to_conjunct.push_back(i); + + _predicates.push_back(predicate); + } else { + _conjunct_to_predicate[i] = -1; + + VLOG_CRITICAL << status.get_error_msg(); + status = predicate->get_es_query_status(); + if (!status.ok()) { + LOG(WARNING) << status.get_error_msg(); + return status; + } + } + } + + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h new file mode 100644 index 0000000000..88586f7383 --- /dev/null +++ b/be/src/vec/exec/scan/new_es_scan_node.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/es/es_predicate.h" +#include "vec/exec/scan/new_es_scanner.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +class NewEsScanNode : public VScanNode { +public: + friend class NewEsScanner; + +public: + NewEsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~NewEsScanNode() override = default; + + std::string get_name() override; + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + +protected: + Status _init_profile() override; + Status _process_conjuncts() override; + Status _init_scanners(std::list<VScanner*>* scanners) override; + +private: + Status build_conjuncts_list(); + +private: + TupleId _tuple_id; + TupleDescriptor* _tuple_desc; + + std::map<std::string, std::string> _properties; + std::map<std::string, std::string> _fields_context; + std::map<std::string, std::string> _docvalue_context; + + std::vector<std::unique_ptr<TEsScanRange>> _scan_ranges; + std::vector<std::string> _column_names; + + std::vector<EsPredicate*> _predicates; + std::vector<int> _predicate_to_conjunct; + std::vector<int> _conjunct_to_predicate; + + std::unique_ptr<MemTracker> _scanner_mem_tracker; + + // Profile + std::unique_ptr<RuntimeProfile> _es_profile; + RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _read_timer; + RuntimeProfile::Counter* _materialize_timer; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp new file mode 100644 index 0000000000..03f4526a23 --- /dev/null +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/new_es_scanner.h" + +#include "vec/exec/scan/new_es_scan_node.h" + +static const std::string NEW_SCANNER_TYPE = "NewEsScanner"; + +namespace doris::vectorized { + +NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, + MemTracker* mem_tracker, TupleId tuple_id, + const std::map<std::string, std::string>& properties, + const std::map<std::string, std::string>& docvalue_context, + bool doc_value_mode) + : VScanner(state, static_cast<VScanNode*>(parent), limit, mem_tracker), + _is_init(false), + _es_eof(false), + _properties(properties), + _line_eof(false), + _batch_eof(false), + _tuple_id(tuple_id), + _tuple_desc(nullptr), + _mem_pool(nullptr), + _es_reader(nullptr), + _es_scroll_parser(nullptr), + _docvalue_context(docvalue_context), + _doc_value_mode(doc_value_mode) {} + +Status NewEsScanner::prepare(RuntimeState* state) { + VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; + + if (_is_init) { + return Status::OK(); + } + + if (nullptr == state) { + return Status::InternalError("input pointer is null."); + } + + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (nullptr == _tuple_desc) { + return Status::InternalError("Failed to get tuple descriptor, tuple_id={}", _tuple_id); + } + + const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT); + _es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode)); + if (_es_reader == nullptr) { + return Status::InternalError("Es reader construct failed."); + } + + _is_init = true; + return Status::OK(); +} + +Status NewEsScanner::open(RuntimeState* state) { + VLOG_CRITICAL << NEW_SCANNER_TYPE << "::open"; + + if (nullptr == state) { + return Status::InternalError("input pointer is null."); + } + + if (!_is_init) { + return Status::InternalError("used before initialize."); + } + + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(VScanner::open(state)); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + + RETURN_IF_ERROR(_es_reader->open()); + _mem_pool.reset(new MemPool(_mem_tracker)); + + return Status::OK(); +} + +Status NewEsScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { + VLOG_CRITICAL << NEW_SCANNER_TYPE << "::_get_block_impl"; + if (nullptr == state || nullptr == block || nullptr == eof) { + return Status::InternalError("input is NULL pointer"); + } + + if (!_is_init) { + return Status::InternalError("used before initialize."); + } + + RETURN_IF_CANCELLED(state); + + if (_es_eof == true) { + *eof = true; + return Status::OK(); + } + + auto column_size = _tuple_desc->slots().size(); + std::vector<MutableColumnPtr> columns(column_size); + + bool mem_reuse = block->mem_reuse(); + const int batch_size = state->batch_size(); + // only empty block should be here + DCHECK(block->rows() == 0); + + do { + columns.resize(column_size); + for (auto i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*block->get_by_position(i).column).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + + while (columns[0]->size() < batch_size && !_es_eof) { + RETURN_IF_CANCELLED(state); + // Get from scanner + RETURN_IF_ERROR(_get_next(columns)); + } + + if (_es_eof == true) { + if (block->rows() == 0) { + *eof = true; + } + break; + } + + // Before really use the Block, must clear other ptr of column in block + // So here need do std::move and clear in `columns` + if (!mem_reuse) { + int column_index = 0; + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + VLOG_ROW << "NewEsScanner output rows: " << block->rows(); + } while (block->rows() == 0 && !(*eof)); + return Status::OK(); +} + +Status NewEsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& columns) { + NewEsScanNode* new_es_scan_node = static_cast<NewEsScanNode*>(_parent); + SCOPED_TIMER(new_es_scan_node->_read_timer); + if (_line_eof && _batch_eof) { + _es_eof = true; + return Status::OK(); + } + + while (!_batch_eof) { + if (_line_eof || _es_scroll_parser == nullptr) { + RETURN_IF_ERROR(_es_reader->get_next(&_batch_eof, _es_scroll_parser)); + if (_batch_eof) { + _es_eof = true; + return Status::OK(); + } + } + + COUNTER_UPDATE(new_es_scan_node->_rows_read_counter, 1); + SCOPED_TIMER(new_es_scan_node->_materialize_timer); + RETURN_IF_ERROR(_es_scroll_parser->fill_columns(_tuple_desc, columns, _mem_pool.get(), + &_line_eof, _docvalue_context)); + if (!_line_eof) { + break; + } + } + + return Status::OK(); +} + +Status NewEsScanner::close(RuntimeState* state) { + if (_is_closed) { + return Status::OK(); + } + + if (_es_reader != nullptr) { + _es_reader->close(); + } + + RETURN_IF_ERROR(VScanner::close(state)); + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h new file mode 100644 index 0000000000..2e776f08c9 --- /dev/null +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/es/es_scan_reader.h" +#include "exec/es/es_scroll_parser.h" +#include "runtime/runtime_state.h" +#include "vec/exec/scan/vscanner.h" + +namespace doris::vectorized { + +class NewEsScanNode; + +class NewEsScanner : public VScanner { +public: + NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t limit, MemTracker* mem_tracker, + TupleId tuple_id, const std::map<std::string, std::string>& properties, + const std::map<std::string, std::string>& docvalue_context, bool doc_value_mode); + + Status open(RuntimeState* state) override; + Status close(RuntimeState* state) override; + +public: + Status prepare(RuntimeState* state); + +protected: + Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; + +private: + Status _get_next(std::vector<vectorized::MutableColumnPtr>& columns); + +private: + bool _is_init; + bool _es_eof; + + const std::map<std::string, std::string>& _properties; + + bool _line_eof; + bool _batch_eof; + + TupleId _tuple_id; + const TupleDescriptor* _tuple_desc; + + std::unique_ptr<MemPool> _mem_pool; + + std::unique_ptr<ESScanReader> _es_reader; + std::unique_ptr<ScrollParser> _es_scroll_parser; + + const std::map<std::string, std::string>& _docvalue_context; + bool _doc_value_mode; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp index 118c35b76c..ee383ac636 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -200,7 +200,7 @@ Status NewOdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool* } else { columns.clear(); } - VLOG_ROW << "VOdbcScanNode output rows: " << block->rows(); + VLOG_ROW << "NewOdbcScanner output rows: " << block->rows(); } while (block->rows() == 0 && !(*eof)); return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org