This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new cd105bee0a [refactor](es) Clean es tcp scannode and related thrift definitions (#9553) cd105bee0a is described below commit cd105bee0afe4bb00a872562e547f72753e7e43a Author: yiguolei <676222...@qq.com> AuthorDate: Sat May 14 10:03:55 2022 +0800 [refactor](es) Clean es tcp scannode and related thrift definitions (#9553) PaloExternalSourcesService is designed for es_scan_node using tcp protocol. But es tcp protocol need deploy a tcp jar into es code. Both es version and lucene version are upgraded, and the tcp jar is not maintained any more. So that I remove all the related code and thrift definitions. --- be/src/exec/CMakeLists.txt | 1 - be/src/exec/es/es_predicate.h | 1 - be/src/exec/es_scan_node.cpp | 868 --------------------- be/src/exec/es_scan_node.h | 88 --- be/src/exec/exec_node.cpp | 6 - be/src/exprs/expr_context.h | 1 - be/src/gen_cpp/CMakeLists.txt | 3 - be/src/runtime/client_cache.h | 3 - be/src/runtime/exec_env.h | 10 - be/src/runtime/exec_env_init.cpp | 5 - be/src/util/thrift_rpc_helper.cpp | 5 - be/test/CMakeLists.txt | 1 - be/test/exec/es_scan_node_test.cpp | 147 ---- gensrc/thrift/PaloExternalDataSourceService.thrift | 250 ------ 14 files changed, 1389 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 709cb4de84..202ae767b0 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -57,7 +57,6 @@ set(EXEC_FILES csv_scan_node.cpp csv_scanner.cpp table_function_node.cpp - es_scan_node.cpp es_http_scan_node.cpp es_http_scanner.cpp es/es_predicate.cpp diff --git a/be/src/exec/es/es_predicate.h b/be/src/exec/es/es_predicate.h index 28552d3eee..05c4e2b946 100644 --- a/be/src/exec/es/es_predicate.h +++ b/be/src/exec/es/es_predicate.h @@ -23,7 +23,6 @@ #include "exprs/slot_ref.h" #include "gen_cpp/Exprs_types.h" #include "gen_cpp/Opcodes_types.h" -#include "gen_cpp/PaloExternalDataSourceService_types.h" #include "runtime/descriptors.h" #include "runtime/primitive_type.h" #include "runtime/tuple.h" diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp deleted file mode 100644 index a0a89d3a68..0000000000 --- a/be/src/exec/es_scan_node.cpp +++ /dev/null @@ -1,868 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "es_scan_node.h" - -#include <gutil/strings/substitute.h> - -#include <boost/algorithm/string.hpp> -#include <string> - -#include "exprs/expr.h" -#include "exprs/expr_context.h" -#include "exprs/in_predicate.h" -#include "exprs/slot_ref.h" -#include "gen_cpp/Exprs_types.h" -#include "gen_cpp/PlanNodes_types.h" -#include "olap/olap_common.h" -#include "olap/utils.h" -#include "runtime/client_cache.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "runtime/string_value.h" -#include "runtime/tuple_row.h" -#include "service/backend_options.h" -#include "util/debug_util.h" -#include "util/runtime_profile.h" - -namespace doris { - -// $0 = column type (e.g. INT) -const std::string ERROR_INVALID_COL_DATA = - "Data source returned inconsistent column data. " - "Expected value of type $0 based on column metadata. This likely indicates a " - "problem with the data source library."; -const std::string ERROR_MEM_LIMIT_EXCEEDED = - "DataSourceScanNode::$0() failed to allocate " - "$1 bytes for $2."; - -EsScanNode::EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ScanNode(pool, tnode, descs), - _tuple_id(tnode.es_scan_node.tuple_id), - _tuple_desc(nullptr), - _env(nullptr), - _scan_range_idx(0) { - if (tnode.es_scan_node.__isset.properties) { - _properties = tnode.es_scan_node.properties; - } -} - -EsScanNode::~EsScanNode() {} - -Status EsScanNode::prepare(RuntimeState* state) { - VLOG_CRITICAL << "EsScanNode::Prepare"; - - RETURN_IF_ERROR(ScanNode::prepare(state)); - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); - if (_tuple_desc == nullptr) { - std::stringstream ss; - ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - _env = state->exec_env(); - - return Status::OK(); -} - -Status EsScanNode::open(RuntimeState* state) { - VLOG_CRITICAL << "EsScanNode::Open"; - - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN)); - RETURN_IF_CANCELLED(state); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); - RETURN_IF_ERROR(ExecNode::open(state)); - - // TExtOpenParams.row_schema - std::vector<TExtColumnDesc> cols; - for (const SlotDescriptor* slot : _tuple_desc->slots()) { - TExtColumnDesc col; - col.__set_name(slot->col_name()); - col.__set_type(slot->type().to_thrift()); - cols.emplace_back(std::move(col)); - } - TExtTableSchema row_schema; - row_schema.cols = std::move(cols); - row_schema.__isset.cols = true; - - // TExtOpenParams.predicates - std::vector<vector<TExtPredicate>> predicates; - std::vector<int> predicate_to_conjunct; - for (int i = 0; i < _conjunct_ctxs.size(); ++i) { - VLOG_CRITICAL << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string(); - std::vector<TExtPredicate> disjuncts; - if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), disjuncts)) { - predicates.emplace_back(std::move(disjuncts)); - predicate_to_conjunct.push_back(i); - } - } - - // open every scan range - std::vector<int> conjunct_accepted_times(_conjunct_ctxs.size(), 0); - for (int i = 0; i < _scan_ranges.size(); ++i) { - TEsScanRange& es_scan_range = _scan_ranges[i]; - - if (es_scan_range.es_hosts.empty()) { - std::stringstream ss; - ss << "es fail to open: hosts empty"; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - - // TExtOpenParams - TExtOpenParams params; - params.__set_query_id(state->query_id()); - _properties["index"] = es_scan_range.index; - if (es_scan_range.__isset.type) { - _properties["type"] = es_scan_range.type; - } - _properties["shard_id"] = std::to_string(es_scan_range.shard_id); - params.__set_properties(_properties); - params.__set_row_schema(row_schema); - params.__set_batch_size(state->batch_size()); - params.__set_predicates(predicates); - TExtOpenResult result; - - // choose an es node, local is the first choice - std::string localhost = BackendOptions::get_localhost(); - bool is_success = false; - for (int j = 0; j < 2; ++j) { - for (auto& es_host : es_scan_range.es_hosts) { - if ((j == 0 && es_host.hostname != localhost) || - (j == 1 && es_host.hostname == localhost)) { - continue; - } - Status status = open_es(es_host, result, params); - if (status.ok()) { - is_success = true; - _addresses.push_back(es_host); - _scan_handles.push_back(result.scan_handle); - if (result.__isset.accepted_conjuncts) { - for (int index : result.accepted_conjuncts) { - conjunct_accepted_times[predicate_to_conjunct[index]]++; - } - } - break; - } else if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) { - // if shard not found, try other nodes - LOG(WARNING) << "shard not found on es node: " - << ", address=" << es_host << ", scan_range_idx=" << i - << ", try other nodes"; - } else { - LOG(WARNING) << "es open error: scan_range_idx=" << i << ", address=" << es_host - << ", msg=" << status.get_error_msg(); - return status; - } - } - if (is_success) { - break; - } - } - - if (!is_success) { - std::stringstream ss; - ss << "es open error: scan_range_idx=" << i << ", can't find shard on any node"; - return Status::InternalError(ss.str()); - } - } - - // remove those conjuncts that accepted by all scan ranges - for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) { - int conjunct_index = predicate_to_conjunct[i]; - if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) { - _pushdown_conjunct_ctxs.push_back(*(_conjunct_ctxs.begin() + conjunct_index)); - _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index); - } - } - - for (int i = 0; i < _conjunct_ctxs.size(); ++i) { - if (!check_left_conjuncts(_conjunct_ctxs[i]->root())) { - return Status::InternalError( - "esquery could only be executed on es, but could not push down to es"); - } - } - - return Status::OK(); -} - -Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - VLOG_CRITICAL << "EsScanNode::GetNext"; - - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); - RETURN_IF_CANCELLED(state); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker()); - - // create tuple - MemPool* tuple_pool = row_batch->tuple_data_pool(); - int64_t tuple_buffer_size; - uint8_t* tuple_buffer = nullptr; - RETURN_IF_ERROR( - row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer)); - Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer); - - // get batch - TExtGetNextResult result; - RETURN_IF_ERROR(get_next_from_es(result)); - _offsets[_scan_range_idx] += result.rows.num_rows; - - // convert - VLOG_CRITICAL << "begin to convert: scan_range_idx=" << _scan_range_idx - << ", num_rows=" << result.rows.num_rows; - std::vector<TExtColumnData>& cols = result.rows.cols; - // indexes of the next non-null value in the row batch, per column. - std::vector<int> cols_next_val_idx(_tuple_desc->slots().size(), 0); - for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) { - if (reached_limit()) { - *eos = true; - break; - } - RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, cols_next_val_idx)); - TupleRow* tuple_row = row_batch->get_row(row_batch->add_row()); - tuple_row->set_tuple(0, tuple); - if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), tuple_row)) { - row_batch->commit_last_row(); - tuple = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple) + - _tuple_desc->byte_size()); - ++_num_rows_returned; - } - } - - VLOG_CRITICAL << "finish one batch: num_rows=" << row_batch->num_rows(); - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - if (result.__isset.eos && result.eos) { - VLOG_CRITICAL << "es finish one scan_range: scan_range_idx=" << _scan_range_idx; - ++_scan_range_idx; - } - if (_scan_range_idx == _scan_ranges.size()) { - *eos = true; - } - - return Status::OK(); -} - -Status EsScanNode::close(RuntimeState* state) { - if (is_closed()) return Status::OK(); - VLOG_CRITICAL << "EsScanNode::Close"; - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); - SCOPED_TIMER(_runtime_profile->total_time_counter()); - Expr::close(_pushdown_conjunct_ctxs, state); - RETURN_IF_ERROR(ExecNode::close(state)); - for (int i = 0; i < _addresses.size(); ++i) { - TExtCloseParams params; - params.__set_scan_handle(_scan_handles[i]); - TExtCloseResult result; - -#ifndef BE_TEST - const TNetworkAddress& address = _addresses[i]; - try { - Status status; - ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache(); - ExtDataSourceServiceConnection client(client_cache, address, 10000, &status); - if (!status.ok()) { - LOG(WARNING) << "es create client error: scan_range_idx=" << i - << ", address=" << address << ", msg=" << status.get_error_msg(); - return status; - } - - try { - VLOG_CRITICAL << "es close param=" << apache::thrift::ThriftDebugString(params); - client->close(result, params); - } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "es close retrying, because: " << e.what(); - RETURN_IF_ERROR(client.reopen()); - client->close(result, params); - } - } catch (apache::thrift::TException& e) { - std::stringstream ss; - ss << "es close error: scan_range_idx=" << i << ", msg=" << e.what(); - LOG(WARNING) << ss.str(); - return Status::ThriftRpcError(ss.str()); - } - - VLOG_CRITICAL << "es close result=" << apache::thrift::ThriftDebugString(result); - Status status(result.status); - if (!status.ok()) { - LOG(WARNING) << "es close error: : scan_range_idx=" << i - << ", msg=" << status.get_error_msg(); - return status; - } -#else - TStatus status; - result.__set_status(status); -#endif - } - - return Status::OK(); -} - -void EsScanNode::debug_string(int indentation_level, std::stringstream* out) const { - *out << string(indentation_level * 2, ' '); - *out << "EsScanNode(tupleid=" << _tuple_id; - *out << ")" << std::endl; - - for (int i = 0; i < _children.size(); ++i) { - _children[i]->debug_string(indentation_level + 1, out); - } -} - -Status EsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { - for (int i = 0; i < scan_ranges.size(); ++i) { - TScanRangeParams scan_range = scan_ranges[i]; - DCHECK(scan_range.scan_range.__isset.es_scan_range); - TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range; - _scan_ranges.push_back(es_scan_range); - } - - _offsets.resize(scan_ranges.size(), 0); - return Status::OK(); -} - -Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result, - TExtOpenParams& params) { - VLOG_CRITICAL << "es open param=" << apache::thrift::ThriftDebugString(params); -#ifndef BE_TEST - try { - ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache(); - Status status; - ExtDataSourceServiceConnection client(client_cache, address, 10000, &status); - if (!status.ok()) { - std::stringstream ss; - ss << "es create client error: address=" << address - << ", msg=" << status.get_error_msg(); - return Status::InternalError(ss.str()); - } - - try { - client->open(result, params); - } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "es open retrying, because: " << e.what(); - RETURN_IF_ERROR(client.reopen()); - client->open(result, params); - } - VLOG_CRITICAL << "es open result=" << apache::thrift::ThriftDebugString(result); - return Status(result.status); - } catch (apache::thrift::TException& e) { - std::stringstream ss; - ss << "es open error: address=" << address << ", msg=" << e.what(); - return Status::InternalError(ss.str()); - } -#else - TStatus status; - result.__set_status(status); - result.__set_scan_handle("0"); - return Status(status); -#endif -} - -// legacy conjuncts must not contain match function -bool EsScanNode::check_left_conjuncts(Expr* conjunct) { - if (is_match_func(conjunct)) { - return false; - } else { - int num_children = conjunct->get_num_children(); - for (int child_idx = 0; child_idx < num_children; ++child_idx) { - if (!check_left_conjuncts(conjunct->get_child(child_idx))) { - return false; - } - } - return true; - } -} - -bool EsScanNode::ignore_cast(SlotDescriptor* slot, Expr* expr) { - if (slot->type().is_date_type() && expr->type().is_date_type()) { - return true; - } - if (slot->type().is_string_type() && expr->type().is_string_type()) { - return true; - } - return false; -} - -bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct, - std::vector<TExtPredicate>& disjuncts) { - if (TExprNodeType::BINARY_PRED == conjunct->node_type()) { - if (conjunct->children().size() != 2) { - VLOG_CRITICAL << "get disjuncts fail: number of children is not 2"; - return false; - } - SlotRef* slotRef; - TExprOpcode::type op; - Expr* expr; - if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) { - expr = conjunct->get_child(1); - slotRef = (SlotRef*)(conjunct->get_child(0)); - op = conjunct->op(); - } else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) { - expr = conjunct->get_child(0); - slotRef = (SlotRef*)(conjunct->get_child(1)); - op = conjunct->op(); - } else { - VLOG_CRITICAL << "get disjuncts fail: no SLOT_REF child"; - return false; - } - - SlotDescriptor* slot_desc = get_slot_desc(slotRef); - if (slot_desc == nullptr) { - VLOG_CRITICAL << "get disjuncts fail: slot_desc is null"; - return false; - } - - TExtLiteral literal; - if (!to_ext_literal(context, expr, &literal)) { - VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type=" - << expr->node_type(); - return false; - } - - TExtColumnDesc columnDesc; - columnDesc.__set_name(slot_desc->col_name()); - columnDesc.__set_type(slot_desc->type().to_thrift()); - TExtBinaryPredicate binaryPredicate; - binaryPredicate.__set_col(columnDesc); - binaryPredicate.__set_op(op); - binaryPredicate.__set_value(std::move(literal)); - TExtPredicate predicate; - predicate.__set_node_type(TExprNodeType::BINARY_PRED); - predicate.__set_binary_predicate(binaryPredicate); - disjuncts.push_back(std::move(predicate)); - return true; - } else if (is_match_func(conjunct)) { - // if this is a function call expr and function name is match, then push - // down it to es - TExtFunction match_function; - match_function.__set_func_name(conjunct->fn().name.function_name); - std::vector<TExtLiteral> query_conditions; - - TExtLiteral literal; - if (!to_ext_literal(context, conjunct->get_child(1), &literal)) { - VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type=" - << conjunct->get_child(1)->node_type(); - return false; - } - - query_conditions.push_back(std::move(literal)); - match_function.__set_values(query_conditions); - TExtPredicate predicate; - predicate.__set_node_type(TExprNodeType::FUNCTION_CALL); - predicate.__set_ext_function(match_function); - disjuncts.push_back(std::move(predicate)); - return true; - } else if (TExprNodeType::IN_PRED == conjunct->node_type()) { - // the op code maybe FILTER_NEW_IN, it means there is function in list - // like col_a in (abs(1)) - if (TExprOpcode::FILTER_IN != conjunct->op() && - TExprOpcode::FILTER_NOT_IN != conjunct->op()) { - return false; - } - TExtInPredicate ext_in_predicate; - std::vector<TExtLiteral> in_pred_values; - InPredicate* pred = static_cast<InPredicate*>(conjunct); - ext_in_predicate.__set_is_not_in(pred->is_not_in()); - if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) { - return false; - } - - SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0)); - SlotDescriptor* slot_desc = get_slot_desc(slot_ref); - if (slot_desc == nullptr) { - return false; - } - TExtColumnDesc columnDesc; - columnDesc.__set_name(slot_desc->col_name()); - columnDesc.__set_type(slot_desc->type().to_thrift()); - ext_in_predicate.__set_col(columnDesc); - - if (pred->get_child(0)->type().type != slot_desc->type().type) { - if (!ignore_cast(slot_desc, pred->get_child(0))) { - return false; - } - } - - HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin(); - while (iter->has_next()) { - if (nullptr == iter->get_value()) { - return false; - } - TExtLiteral literal; - if (!to_ext_literal(slot_desc->type().type, const_cast<void*>(iter->get_value()), - &literal)) { - VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type=" - << slot_desc->type().type; - return false; - } - in_pred_values.push_back(literal); - iter->next(); - } - ext_in_predicate.__set_values(in_pred_values); - TExtPredicate predicate; - predicate.__set_node_type(TExprNodeType::IN_PRED); - predicate.__set_in_predicate(ext_in_predicate); - disjuncts.push_back(std::move(predicate)); - return true; - } else if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) { - if (TExprOpcode::COMPOUND_OR != conjunct->op()) { - VLOG_CRITICAL << "get disjuncts fail: op is not COMPOUND_OR"; - return false; - } - if (!get_disjuncts(context, conjunct->get_child(0), disjuncts)) { - return false; - } - if (!get_disjuncts(context, conjunct->get_child(1), disjuncts)) { - return false; - } - return true; - } else { - VLOG_CRITICAL << "get disjuncts fail: node type is " << conjunct->node_type() - << ", should be BINARY_PRED or COMPOUND_PRED"; - return false; - } -} - -bool EsScanNode::is_match_func(Expr* conjunct) { - if (TExprNodeType::FUNCTION_CALL == conjunct->node_type() && - conjunct->fn().name.function_name == "esquery") { - return true; - } - return false; -} - -SlotDescriptor* EsScanNode::get_slot_desc(SlotRef* slotRef) { - std::vector<SlotId> slot_ids; - slotRef->get_slot_ids(&slot_ids); - SlotDescriptor* slot_desc = nullptr; - for (SlotDescriptor* slot : _tuple_desc->slots()) { - if (slot->id() == slot_ids[0]) { - slot_desc = slot; - break; - } - } - return slot_desc; -} - -bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal) { - switch (expr->node_type()) { - case TExprNodeType::BOOL_LITERAL: - case TExprNodeType::INT_LITERAL: - case TExprNodeType::LARGE_INT_LITERAL: - case TExprNodeType::FLOAT_LITERAL: - case TExprNodeType::DECIMAL_LITERAL: - case TExprNodeType::STRING_LITERAL: - case TExprNodeType::DATE_LITERAL: - return to_ext_literal(expr->type().type, context->get_value(expr, nullptr), literal); - default: - return false; - } -} - -bool EsScanNode::to_ext_literal(PrimitiveType slot_type, void* value, TExtLiteral* literal) { - TExprNodeType::type node_type; - switch (slot_type) { - case TYPE_BOOLEAN: { - node_type = (TExprNodeType::BOOL_LITERAL); - TBoolLiteral bool_literal; - bool_literal.__set_value(*reinterpret_cast<bool*>(value)); - literal->__set_bool_literal(bool_literal); - break; - } - - case TYPE_TINYINT: { - node_type = (TExprNodeType::INT_LITERAL); - TIntLiteral int_literal; - int_literal.__set_value(*reinterpret_cast<int8_t*>(value)); - literal->__set_int_literal(int_literal); - break; - } - case TYPE_SMALLINT: { - node_type = (TExprNodeType::INT_LITERAL); - TIntLiteral int_literal; - int_literal.__set_value(*reinterpret_cast<int16_t*>(value)); - literal->__set_int_literal(int_literal); - break; - } - case TYPE_INT: { - node_type = (TExprNodeType::INT_LITERAL); - TIntLiteral int_literal; - int_literal.__set_value(*reinterpret_cast<int32_t*>(value)); - literal->__set_int_literal(int_literal); - break; - } - case TYPE_BIGINT: { - node_type = (TExprNodeType::INT_LITERAL); - TIntLiteral int_literal; - int_literal.__set_value(*reinterpret_cast<int64_t*>(value)); - literal->__set_int_literal(int_literal); - break; - } - - case TYPE_LARGEINT: { - node_type = (TExprNodeType::LARGE_INT_LITERAL); - TLargeIntLiteral large_int_literal; - large_int_literal.__set_value( - LargeIntValue::to_string(*reinterpret_cast<__int128*>(value))); - literal->__set_large_int_literal(large_int_literal); - break; - } - - case TYPE_FLOAT: { - node_type = (TExprNodeType::FLOAT_LITERAL); - TFloatLiteral float_literal; - float_literal.__set_value(*reinterpret_cast<float*>(value)); - literal->__set_float_literal(float_literal); - break; - } - case TYPE_DOUBLE: { - node_type = (TExprNodeType::FLOAT_LITERAL); - TFloatLiteral float_literal; - float_literal.__set_value(*reinterpret_cast<double*>(value)); - literal->__set_float_literal(float_literal); - break; - } - - case TYPE_DATE: - case TYPE_DATETIME: { - node_type = (TExprNodeType::DATE_LITERAL); - const DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value); - char str[MAX_DTVALUE_STR_LEN]; - date_value.to_string(str); - TDateLiteral date_literal; - date_literal.__set_value(str); - literal->__set_date_literal(date_literal); - break; - } - - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: { - node_type = (TExprNodeType::STRING_LITERAL); - TStringLiteral string_literal; - string_literal.__set_value((reinterpret_cast<StringValue*>(value))->debug_string()); - literal->__set_string_literal(string_literal); - break; - } - - default: { - DCHECK(false) << "Invalid type."; - return false; - } - } - literal->__set_node_type(node_type); - return true; -} - -Status EsScanNode::get_next_from_es(TExtGetNextResult& result) { - TExtGetNextParams params; - params.__set_scan_handle(_scan_handles[_scan_range_idx]); - params.__set_offset(_offsets[_scan_range_idx]); - - // getNext - const TNetworkAddress& address = _addresses[_scan_range_idx]; -#ifndef BE_TEST - try { - Status create_client_status; - ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache(); - ExtDataSourceServiceConnection client(client_cache, address, 10000, &create_client_status); - if (!create_client_status.ok()) { - LOG(WARNING) << "es create client error: scan_range_idx=" << _scan_range_idx - << ", address=" << address - << ", msg=" << create_client_status.get_error_msg(); - return create_client_status; - } - - try { - VLOG_CRITICAL << "es get_next param=" << apache::thrift::ThriftDebugString(params); - client->getNext(result, params); - } catch (apache::thrift::transport::TTransportException& e) { - std::stringstream ss; - ss << "es get_next error: scan_range_idx=" << _scan_range_idx << ", msg=" << e.what(); - LOG(WARNING) << ss.str(); - RETURN_IF_ERROR(client.reopen()); - return Status::ThriftRpcError(ss.str()); - } - } catch (apache::thrift::TException& e) { - std::stringstream ss; - ss << "es get_next error: scan_range_idx=" << _scan_range_idx << ", msg=" << e.what(); - LOG(WARNING) << ss.str(); - return Status::ThriftRpcError(ss.str()); - } -#else - TStatus status; - result.__set_status(status); - result.__set_eos(true); - TExtColumnData col_data; - std::vector<bool> is_null; - is_null.push_back(false); - col_data.__set_is_null(is_null); - std::vector<int32_t> int_vals; - int_vals.push_back(1); - int_vals.push_back(2); - col_data.__set_int_vals(int_vals); - std::vector<TExtColumnData> cols; - cols.push_back(col_data); - TExtRowBatch rows; - rows.__set_cols(cols); - rows.__set_num_rows(2); - result.__set_rows(rows); - return Status(status); -#endif - - // check result - VLOG_CRITICAL << "es get_next result=" << apache::thrift::ThriftDebugString(result); - Status get_next_status(result.status); - if (!get_next_status.ok()) { - LOG(WARNING) << "es get_next error: scan_range_idx=" << _scan_range_idx - << ", address=" << address << ", msg=" << get_next_status.get_error_msg(); - return get_next_status; - } - if (!result.__isset.rows || !result.rows.__isset.num_rows) { - std::stringstream ss; - ss << "es get_next error: scan_range_idx=" << _scan_range_idx - << ", msg=rows or num_rows not in result"; - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - - return Status::OK(); -} - -Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple, - const std::vector<TExtColumnData>& cols, int row_idx, - std::vector<int>& cols_next_val_idx) { - tuple->init(_tuple_desc->byte_size()); - - for (int i = 0; i < _tuple_desc->slots().size(); ++i) { - const SlotDescriptor* slot_desc = _tuple_desc->slots()[i]; - - if (!slot_desc->is_materialized()) { - continue; - } - - void* slot = tuple->get_slot(slot_desc->tuple_offset()); - const TExtColumnData& col = cols[i]; - - if (col.is_null[row_idx]) { - tuple->set_null(slot_desc->null_indicator_offset()); - continue; - } else { - tuple->set_not_null(slot_desc->null_indicator_offset()); - } - - int val_idx = cols_next_val_idx[i]++; - switch (slot_desc->type().type) { - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_STRING: { - if (val_idx >= col.string_vals.size()) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "STRING")); - } - const string& val = col.string_vals[val_idx]; - size_t val_size = val.size(); - Status rst; - char* buffer = - reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst)); - if (UNLIKELY(buffer == nullptr)) { - std::string details = strings::Substitute( - ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot"); - RETURN_LIMIT_EXCEEDED(tuple_pool->mem_tracker(), nullptr, details, val_size, rst); - } - memcpy(buffer, val.data(), val_size); - reinterpret_cast<StringValue*>(slot)->ptr = buffer; - reinterpret_cast<StringValue*>(slot)->len = val_size; - break; - } - case TYPE_TINYINT: - if (val_idx >= col.byte_vals.size()) { - return Status::InternalError( - strings::Substitute(ERROR_INVALID_COL_DATA, "TINYINT")); - } - *reinterpret_cast<int8_t*>(slot) = col.byte_vals[val_idx]; - break; - case TYPE_SMALLINT: - if (val_idx >= col.short_vals.size()) { - return Status::InternalError( - strings::Substitute(ERROR_INVALID_COL_DATA, "SMALLINT")); - } - *reinterpret_cast<int16_t*>(slot) = col.short_vals[val_idx]; - break; - case TYPE_INT: - if (val_idx >= col.int_vals.size()) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "INT")); - } - *reinterpret_cast<int32_t*>(slot) = col.int_vals[val_idx]; - break; - case TYPE_BIGINT: - if (val_idx >= col.long_vals.size()) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "BIGINT")); - } - *reinterpret_cast<int64_t*>(slot) = col.long_vals[val_idx]; - break; - case TYPE_LARGEINT: - if (val_idx >= col.long_vals.size()) { - return Status::InternalError( - strings::Substitute(ERROR_INVALID_COL_DATA, "LARGEINT")); - } - *reinterpret_cast<int128_t*>(slot) = col.long_vals[val_idx]; - break; - case TYPE_DOUBLE: - if (val_idx >= col.double_vals.size()) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "DOUBLE")); - } - *reinterpret_cast<double*>(slot) = col.double_vals[val_idx]; - break; - case TYPE_FLOAT: - if (val_idx >= col.double_vals.size()) { - return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "FLOAT")); - } - *reinterpret_cast<float*>(slot) = col.double_vals[val_idx]; - break; - case TYPE_BOOLEAN: - if (val_idx >= col.bool_vals.size()) { - return Status::InternalError( - strings::Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN")); - } - *reinterpret_cast<int8_t*>(slot) = col.bool_vals[val_idx]; - break; - case TYPE_DATE: - if (val_idx >= col.long_vals.size() || - !reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx], - "+08:00")) { - return Status::InternalError( - strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE")); - } - reinterpret_cast<DateTimeValue*>(slot)->cast_to_date(); - break; - case TYPE_DATETIME: { - if (val_idx >= col.long_vals.size() || - !reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx], - "+08:00")) { - return Status::InternalError( - strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME")); - } - reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME); - break; - } - default: - DCHECK(false); - } - } - return Status::OK(); -} - -} // namespace doris diff --git a/be/src/exec/es_scan_node.h b/be/src/exec/es_scan_node.h deleted file mode 100644 index 173cddaf68..0000000000 --- a/be/src/exec/es_scan_node.h +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <memory> -#include <vector> - -#include "exec/scan_node.h" -#include "exprs/slot_ref.h" -#include "gen_cpp/PaloExternalDataSourceService_types.h" -#include "gen_cpp/TExtDataSourceService.h" -#include "runtime/descriptors.h" -#include "runtime/exec_env.h" -#include "runtime/tuple.h" - -namespace doris { - -class TupleDescriptor; -class RuntimeState; -class Status; - -class EsScanNode : public ScanNode { -public: - EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~EsScanNode(); - - virtual Status prepare(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; - virtual Status close(RuntimeState* state) override; - virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; - -protected: - // Write debug string of this into out. - virtual void debug_string(int indentation_level, std::stringstream* out) const override; - -private: - Status open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params); - Status materialize_row(MemPool* tuple_pool, Tuple* tuple, const vector<TExtColumnData>& cols, - int next_row_idx, vector<int>& cols_next_val_idx); - Status get_next_from_es(TExtGetNextResult& result); - - bool get_disjuncts(ExprContext* context, Expr* conjunct, vector<TExtPredicate>& disjuncts); - bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal); - bool to_ext_literal(PrimitiveType node_type, void* value, TExtLiteral* literal); - bool ignore_cast(SlotDescriptor* slot, Expr* expr); - - bool is_match_func(Expr* conjunct); - - SlotDescriptor* get_slot_desc(SlotRef* slotRef); - - // check if open result meets condition - // 1. check if left conjuncts contain "match" function, since match function could only be executed on es - bool check_left_conjuncts(Expr* conjunct); - -private: - TupleId _tuple_id; - std::map<std::string, std::string> _properties; - const TupleDescriptor* _tuple_desc; - ExecEnv* _env; - std::vector<TEsScanRange> _scan_ranges; - - // scan range's iterator, used in get_next() - int _scan_range_idx; - - // store every scan range's netaddress/handle/offset - std::vector<TNetworkAddress> _addresses; - std::vector<std::string> _scan_handles; - std::vector<int> _offsets; - std::vector<ExprContext*> _pushdown_conjunct_ctxs; -}; - -} // namespace doris diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4dbd827605..ccd8e3c854 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -33,7 +33,6 @@ #include "exec/cross_join_node.h" #include "exec/empty_set_node.h" #include "exec/es_http_scan_node.h" -#include "exec/es_scan_node.h" #include "exec/except_node.h" #include "exec/exchange_node.h" #include "exec/hash_join_node.h" @@ -428,10 +427,6 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new OdbcScanNode(pool, tnode, descs)); return Status::OK(); - case TPlanNodeType::ES_SCAN_NODE: - *node = pool->add(new EsScanNode(pool, tnode, descs)); - return Status::OK(); - case TPlanNodeType::ES_HTTP_SCAN_NODE: if (state->enable_vectorized_exec()) { *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs)); @@ -662,7 +657,6 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) { collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes); - collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes); collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); } diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index 32d408e99d..7be7a8f15f 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -160,7 +160,6 @@ private: friend class RuntimePredicateWrapper; friend class BloomFilterPredicate; friend class OlapScanNode; - friend class EsScanNode; friend class EsPredicate; /// FunctionContexts for each registered expression. The FunctionContexts are created diff --git a/be/src/gen_cpp/CMakeLists.txt b/be/src/gen_cpp/CMakeLists.txt index 22aa8c9cfe..c4b2f0dc14 100644 --- a/be/src/gen_cpp/CMakeLists.txt +++ b/be/src/gen_cpp/CMakeLists.txt @@ -37,9 +37,6 @@ set(SRC_FILES ${GEN_CPP_DIR}/HeartbeatService_types.cpp ${GEN_CPP_DIR}/PaloInternalService_constants.cpp ${GEN_CPP_DIR}/PaloInternalService_types.cpp - ${GEN_CPP_DIR}/PaloExternalDataSourceService_constants.cpp - ${GEN_CPP_DIR}/PaloExternalDataSourceService_types.cpp - ${GEN_CPP_DIR}/TExtDataSourceService.cpp ${GEN_CPP_DIR}/FrontendService.cpp ${GEN_CPP_DIR}/FrontendService_constants.cpp ${GEN_CPP_DIR}/FrontendService_types.cpp diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index e29ecc632e..418917ab29 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -289,8 +289,5 @@ using FrontendServiceConnection = ClientConnection<FrontendServiceClient>; class TPaloBrokerServiceClient; using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>; using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>; -class TExtDataSourceServiceClient; -using ExtDataSourceServiceClientCache = ClientCache<TExtDataSourceServiceClient>; -using ExtDataSourceServiceConnection = ClientConnection<TExtDataSourceServiceClient>; } // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4a279d8142..6fda2e6a1f 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -64,7 +64,6 @@ class SmallFileMgr; class BackendServiceClient; class FrontendServiceClient; class TPaloBrokerServiceClient; -class TExtDataSourceServiceClient; class PBackendService_Stub; class PFunctionService_Stub; @@ -108,9 +107,6 @@ public: ClientCache<BackendServiceClient>* client_cache() { return _backend_client_cache; } ClientCache<FrontendServiceClient>* frontend_client_cache() { return _frontend_client_cache; } ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; } - ClientCache<TExtDataSourceServiceClient>* extdatasource_client_cache() { - return _extdatasource_client_cache; - } // using template to simplify client cache management template <typename T> @@ -184,7 +180,6 @@ private: ClientCache<BackendServiceClient>* _backend_client_cache = nullptr; ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr; ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr; - ClientCache<TExtDataSourceServiceClient>* _extdatasource_client_cache = nullptr; ThreadResourceMgr* _thread_mgr = nullptr; // The ancestor for all querys tracker. @@ -248,10 +243,5 @@ inline ClientCache<TPaloBrokerServiceClient>* ExecEnv::get_client_cache<TPaloBrokerServiceClient>() { return _broker_client_cache; } -template <> -inline ClientCache<TExtDataSourceServiceClient>* -ExecEnv::get_client_cache<TExtDataSourceServiceClient>() { - return _extdatasource_client_cache; -} } // namespace doris diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0c9a6edc84..ff5b847810 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -20,7 +20,6 @@ #include "common/logging.h" #include "gen_cpp/BackendService.h" #include "gen_cpp/HeartbeatService_types.h" -#include "gen_cpp/TExtDataSourceService.h" #include "gen_cpp/TPaloBrokerService.h" #include "olap/page_cache.h" #include "olap/segment_loader.h" @@ -95,8 +94,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host); _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host); _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host); - _extdatasource_client_cache = - new ExtDataSourceServiceClientCache(config::max_client_cache_size_per_host); _task_pool_mem_tracker_registry.reset(new MemTrackerTaskPool()); _thread_mgr = new ThreadResourceMgr(); if (config::doris_enable_scanner_thread_pool_per_disk && @@ -148,7 +145,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); _broker_client_cache->init_metrics("broker"); - _extdatasource_client_cache->init_metrics("extdatasource"); _result_mgr->init(); _cgroups_mgr->init_cgroups(); _etl_job_mgr->init(); @@ -323,7 +319,6 @@ void ExecEnv::_destroy() { SAFE_DELETE(_scan_thread_pool); SAFE_DELETE(_thread_mgr); SAFE_DELETE(_broker_client_cache); - SAFE_DELETE(_extdatasource_client_cache); SAFE_DELETE(_frontend_client_cache); SAFE_DELETE(_backend_client_cache); SAFE_DELETE(_result_mgr); diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp index 1083df6bb4..94ad060250 100644 --- a/be/src/util/thrift_rpc_helper.cpp +++ b/be/src/util/thrift_rpc_helper.cpp @@ -96,9 +96,4 @@ template Status ThriftRpcHelper::rpc<TPaloBrokerServiceClient>( const std::string& ip, const int32_t port, std::function<void(ClientConnection<TPaloBrokerServiceClient>&)> callback, int timeout_ms); -template Status ThriftRpcHelper::rpc<TExtDataSourceServiceClient>( - const std::string& ip, const int32_t port, - std::function<void(ClientConnection<TExtDataSourceServiceClient>&)> callback, - int timeout_ms); - } // namespace doris diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 6b79813060..f39cb68926 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -86,7 +86,6 @@ set(EXEC_TEST_FILES # exec/schema_scanner/schema_collations_scanner_test.cpp # exec/schema_scanner/schema_charsets_scanner_test.cpp # exec/broker_reader_test.cpp - # exec/es_scan_node_test.cpp ) if(DEFINED DORIS_WITH_LZO) diff --git a/be/test/exec/es_scan_node_test.cpp b/be/test/exec/es_scan_node_test.cpp deleted file mode 100644 index c647170f53..0000000000 --- a/be/test/exec/es_scan_node_test.cpp +++ /dev/null @@ -1,147 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/es_scan_node.h" - -#include <gtest/gtest.h> - -#include <string> - -#include "common/object_pool.h" -#include "gen_cpp/PlanNodes_types.h" -#include "runtime/descriptors.h" -#include "runtime/mem_pool.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "runtime/string_value.h" -#include "runtime/tuple_row.h" -#include "util/debug_util.h" -#include "util/runtime_profile.h" - -using std::vector; - -namespace doris { - -// mock -class EsScanNodeTest : public testing::Test { -public: - EsScanNodeTest() : _runtime_state(TQueryGlobals()) { - _runtime_state._instance_mem_tracker.reset(new MemTracker()); - TDescriptorTable t_desc_table; - - // table descriptors - TTableDescriptor t_table_desc; - - t_table_desc.id = 0; - t_table_desc.tableType = TTableType::ES_TABLE; - t_table_desc.numCols = 0; - t_table_desc.numClusteringCols = 0; - t_table_desc.__isset.esTable = true; - t_desc_table.tableDescriptors.push_back(t_table_desc); - t_desc_table.__isset.tableDescriptors = true; - // TSlotDescriptor - int offset = 1; - int i = 0; - // id - { - TSlotDescriptor t_slot_desc; - t_slot_desc.__set_slotType(TypeDescriptor(TYPE_INT).to_thrift()); - t_slot_desc.__set_columnPos(i); - t_slot_desc.__set_byteOffset(offset); - t_slot_desc.__set_nullIndicatorByte(0); - t_slot_desc.__set_nullIndicatorBit(-1); - t_slot_desc.__set_slotIdx(i); - t_slot_desc.__set_isMaterialized(true); - t_desc_table.slotDescriptors.push_back(t_slot_desc); - offset += sizeof(int); - } - - TTupleDescriptor t_tuple_desc; - t_tuple_desc.id = 0; - t_tuple_desc.byteSize = offset; - t_tuple_desc.numNullBytes = 1; - t_tuple_desc.tableId = 0; - t_tuple_desc.__isset.tableId = true; - t_desc_table.__isset.slotDescriptors = true; - t_desc_table.tupleDescriptors.push_back(t_tuple_desc); - - DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); - _runtime_state.set_desc_tbl(_desc_tbl); - - // Node Id - _tnode.node_id = 0; - _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE; - _tnode.num_children = 0; - _tnode.limit = -1; - _tnode.row_tuples.push_back(0); - _tnode.nullable_tuples.push_back(false); - _tnode.es_scan_node.tuple_id = 0; - std::map<std::string, std::string> properties; - _tnode.es_scan_node.__set_properties(properties); - _tnode.__isset.es_scan_node = true; - } - -protected: - virtual void SetUp() {} - virtual void TearDown() {} - TPlanNode _tnode; - ObjectPool _obj_pool; - DescriptorTbl* _desc_tbl; - RuntimeState _runtime_state; -}; - -TEST_F(EsScanNodeTest, normal_use) { - EsScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); - Status status = scan_node.prepare(&_runtime_state); - EXPECT_TRUE(status.ok()); - TEsScanRange es_scan_range; - es_scan_range.__set_index("index1"); - es_scan_range.__set_type("docs"); - es_scan_range.__set_shard_id(0); - TNetworkAddress es_host; - es_host.__set_hostname("host"); - es_host.__set_port(8200); - std::vector<TNetworkAddress> es_hosts; - es_hosts.push_back(es_host); - es_scan_range.__set_es_hosts(es_hosts); - TScanRange scan_range; - scan_range.__set_es_scan_range(es_scan_range); - TScanRangeParams scan_range_params; - scan_range_params.__set_scan_range(scan_range); - std::vector<TScanRangeParams> scan_ranges; - scan_ranges.push_back(scan_range_params); - - status = scan_node.set_scan_ranges(scan_ranges); - EXPECT_TRUE(status.ok()); - std::stringstream out; - scan_node.debug_string(1, &out); - LOG(WARNING) << out.str(); - - status = scan_node.open(&_runtime_state); - EXPECT_TRUE(status.ok()); - RowBatch row_batch(scan_node._row_descriptor, _runtime_state.batch_size()); - bool eos = false; - status = scan_node.get_next(&_runtime_state, &row_batch, &eos); - EXPECT_TRUE(status.ok()); - EXPECT_EQ(2, row_batch.num_rows()); - EXPECT_TRUE(eos); - - status = scan_node.close(&_runtime_state); - EXPECT_TRUE(status.ok()); -} - -} // namespace doris diff --git a/gensrc/thrift/PaloExternalDataSourceService.thrift b/gensrc/thrift/PaloExternalDataSourceService.thrift deleted file mode 100644 index c4cce76d19..0000000000 --- a/gensrc/thrift/PaloExternalDataSourceService.thrift +++ /dev/null @@ -1,250 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -namespace java org.apache.doris.thrift -namespace cpp doris - -include "Exprs.thrift" -include "Opcodes.thrift" -include "Status.thrift" -include "Types.thrift" - -// A result set column descriptor. -// this definition id different from column desc in palo, the column desc in palo only support scalar type, does not support map, array -// so that should convert palo column desc into ExtColumnDesc -struct TExtColumnDesc { - // The column name as given in the Create .. statement. Always set. - 1: optional string name - // The column type. Always set. - 2: optional Types.TTypeDesc type -} - -// Metadata used to describe the schema (column names, types, comments) -// of result sets. -struct TExtTableSchema { - // List of columns. Always set. - 1: optional list<TExtColumnDesc> cols -} - -struct TExtLiteral { - 1: required Exprs.TExprNodeType node_type - 2: optional Exprs.TBoolLiteral bool_literal - 3: optional Exprs.TDateLiteral date_literal - 4: optional Exprs.TFloatLiteral float_literal - 5: optional Exprs.TIntLiteral int_literal - 6: optional Exprs.TStringLiteral string_literal - 7: optional Exprs.TDecimalLiteral decimal_literal - 8: optional Exprs.TLargeIntLiteral large_int_literal -} - -// Binary predicates that can be pushed to the external data source and -// are of the form <col> <op> <val>. Sources can choose to accept or reject -// predicates via the return value of prepare(), see TPrepareResult. -// The column and the value are guaranteed to be type compatible in Impala, -// but they are not necessarily the same type, so the data source -// implementation may need to do an implicit cast. -// > < = != >= <= -struct TExtBinaryPredicate { - // Column on which the predicate is applied. Always set. - 1: optional TExtColumnDesc col - // Comparison operator. Always set. - 2: optional Opcodes.TExprOpcode op - // Value on the right side of the binary predicate. Always set. - 3: optional TExtLiteral value -} - -struct TExtInPredicate { - 1: optional bool is_not_in - // Column on which the predicate is applied. Always set. - 2: optional TExtColumnDesc col - // Value on the right side of the binary predicate. Always set. - 3: optional list<TExtLiteral> values -} - -struct TExtLikePredicate { - 1: optional TExtColumnDesc col - 2: optional TExtLiteral value -} - -struct TExtIsNullPredicate { - 1: optional bool is_not_null - 2: optional TExtColumnDesc col -} - -struct TExtFunction { - 1: optional string func_name - // input parameter column descs - 2: optional list<TExtColumnDesc> cols - // input parameter column literals - 3: optional list<TExtLiteral> values -} - -// a union of all predicates -struct TExtPredicate { - 1: required Exprs.TExprNodeType node_type - 2: optional TExtBinaryPredicate binary_predicate - 3: optional TExtInPredicate in_predicate - 4: optional TExtLikePredicate like_predicate - 5: optional TExtIsNullPredicate is_null_predicate - 6: optional TExtFunction ext_function -} - -// A union over all possible return types for a column of data -// Currently only used by ExternalDataSource types -// -struct TExtColumnData { - // One element in the list for every row in the column indicating if there is - // a value in the vals list or a null. - 1: required list<bool> is_null; - - // Only one is set, only non-null values are set. this indicates one column data for a row batch - 2: optional list<bool> bool_vals; - 3: optional binary byte_vals; - 4: optional list<i16> short_vals; - 5: optional list<i32> int_vals; - 6: optional list<i64> long_vals; - 7: optional list<double> double_vals; - 8: optional list<string> string_vals; - 9: optional list<binary> binary_vals; -} - -// Serialized batch of rows returned by getNext(). -// one row batch contains mult rows, and the result is arranged in column style -struct TExtRowBatch { - // Each TColumnData contains the data for an entire column. Always set. - 1: optional list<TExtColumnData> cols - - // The number of rows returned. For count(*) queries, there may not be - // any materialized columns so cols will be an empty list and this value - // will indicate how many rows are returned. When there are materialized - // columns, this number should be the same as the size of each - // TColumnData.is_null list. - 2: optional i64 num_rows -} - -// Parameters to prepare(). -struct TExtPrepareParams { - // The name of the table. Always set. - 1: optional string table_name - - // A string specified for the table that is passed to the external data source. - // Always set, may be an empty string. - 2: optional string init_string - - // A list of conjunctive (AND) clauses, each of which contains a list of - // disjunctive (OR) binary predicates. Always set, may be an empty list. - 3: optional list<list<TExtPredicate>> predicates -} - -// Returned by prepare(). -struct TExtPrepareResult { - 1: required Status.TStatus status - - // Estimate of the total number of rows returned when applying the predicates indicated - // by accepted_conjuncts. Not set if the data source does not support providing - // this statistic. - 2: optional i64 num_rows_estimate - - // Accepted conjuncts. References the 'predicates' parameter in the prepare() - // call. It contains the 0-based indices of the top-level list elements (the - // AND elements) that the library will be able to apply during the scan. Those - // elements that aren’t referenced in accepted_conjuncts will be evaluated by - // Impala itself. - 3: optional list<i32> accepted_conjuncts -} - -// Parameters to open(). -struct TExtOpenParams { - // A unique identifier for the query. Always set. - 1: optional Types.TUniqueId query_id - - // The name of the table. Always set. - 2: optional string table_name - - // A string specified for the table that is passed to the external data source. - // Always set, may be an empty string. - 3: optional map<string,string> properties - - // The authenticated user name. Always set. - 4: optional string authenticated_user_name - - // The schema of the rows that the scan needs to return. Always set. - 5: optional TExtTableSchema row_schema - - // The expected size of the row batches it returns in the subsequent getNext() calls. - // Always set. - 6: optional i32 batch_size - - 7: optional list<list<TExtPredicate>> predicates - - // The query limit, if specified. - 8: optional i64 limit -} - -// Returned by open(). -struct TExtOpenResult { - 1: required Status.TStatus status - - // An opaque handle used in subsequent getNext()/close() calls. Required. - 2: optional string scan_handle - 3: optional list<i32> accepted_conjuncts -} - -// Parameters to getNext() -struct TExtGetNextParams { - // The opaque handle returned by the previous open() call. Always set. - 1: optional string scan_handle // es search context id - 2: optional i64 offset // es should check the offset to prevent duplicate rpc calls -} - -// Returned by getNext(). -struct TExtGetNextResult { - 1: required Status.TStatus status - - // If true, reached the end of the result stream; subsequent calls to - // getNext() won’t return any more results. Required. - 2: optional bool eos - - // A batch of rows to return, if any exist. The number of rows in the batch - // should be less than or equal to the batch_size specified in TOpenParams. - 3: optional TExtRowBatch rows -} - -// Parameters to close() -struct TExtCloseParams { - // The opaque handle returned by the previous open() call. Always set. - 1: optional string scan_handle -} - -// Returned by close(). -struct TExtCloseResult { - 1: required Status.TStatus status -} - -// This data source can be considered as the entry of palo's unified external data source -service TExtDataSourceService { - // 1. palo be call this api to send index, type, shard id to es - // 2. es will open a search context and prepare data, will return a context id - TExtOpenResult open(1: TExtOpenParams params); - // 1. palo be will send a search context id to es - // 2. es will find the search context and find a batch rows and send to palo - // 3. palo will run the remaining predicates when receving data - // 4. es should check the offset when receive the request - TExtGetNextResult getNext(1: TExtGetNextParams params); - // 1. es will release the context when receiving the data - TExtCloseResult close(1: TExtCloseParams params); -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org