This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5142f30e803919773975959c4d5077df08f1d911 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Sat Jan 14 10:28:59 2023 +0800 [Enhencement](jdbc scanner) add profile for jdbc scanner (#15914) --- be/src/olap/schema_change.cpp | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 13 ++++++++++-- be/src/vec/exec/scan/new_jdbc_scanner.h | 10 +++++++++ be/src/vec/exec/scan/vscan_node.h | 1 + be/src/vec/exec/vjdbc_connector.cpp | 34 +++++++++++++++++++++++-------- be/src/vec/exec/vjdbc_connector.h | 6 ++++++ 6 files changed, 55 insertions(+), 11 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 55eb3e8203..232b85bd8e 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -2185,7 +2185,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams } if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) { - res = Status::Error<SCHEMA_SCHEMA_INVALID>( + res = Status::InternalError( "Don't support to add materialized view by linked schema change"); return process_alter_exit(); } diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 377ae6c800..80fc3669c8 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -17,6 +17,8 @@ #include "new_jdbc_scanner.h" +#include "util/runtime_profile.h" + namespace doris::vectorized { NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, const TupleId& tuple_id, const std::string& query_string, @@ -27,7 +29,14 @@ NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int _tuple_id(tuple_id), _query_string(query_string), _tuple_desc(nullptr), - _table_type(table_type) {} + _table_type(table_type) { + _load_jar_timer = ADD_TIMER(get_parent()->_scanner_profile, "LoadJarTime"); + _init_connector_timer = ADD_TIMER(get_parent()->_scanner_profile, "InitConnectorTime"); + _check_type_timer = ADD_TIMER(get_parent()->_scanner_profile, "CheckTypeTime"); + _get_data_timer = ADD_TIMER(get_parent()->_scanner_profile, "GetDataTime"); + _execte_read_timer = ADD_TIMER(get_parent()->_scanner_profile, "ExecteReadTime"); + _connector_close_timer = ADD_TIMER(get_parent()->_scanner_profile, "ConnectorCloseTime"); +} Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { VLOG_CRITICAL << "NewJdbcScanner::Prepare"; @@ -67,7 +76,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx _jdbc_param.query_string = std::move(_query_string); _jdbc_param.table_type = _table_type; - _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); + _jdbc_connector.reset(new (std::nothrow) JdbcConnector(this, _jdbc_param)); if (_jdbc_connector == nullptr) { return Status::InternalError("new a jdbc scanner failed."); } diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index e88b33d252..4f869d0f41 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -18,6 +18,7 @@ #pragma once #include "runtime/runtime_state.h" +#include "util/runtime_profile.h" #include "vec/exec/scan/new_jdbc_scan_node.h" #include "vec/exec/scan/vscanner.h" #include "vec/exec/vjdbc_connector.h" @@ -25,6 +26,8 @@ namespace doris { namespace vectorized { class NewJdbcScanner : public VScanner { public: + friend class JdbcConnector; + NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit, const TupleId& tuple_id, const std::string& query_string, TOdbcTableType::type table_type, RuntimeProfile* profile); @@ -37,6 +40,13 @@ public: protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; + RuntimeProfile::Counter* _load_jar_timer = nullptr; + RuntimeProfile::Counter* _init_connector_timer = nullptr; + RuntimeProfile::Counter* _get_data_timer = nullptr; + RuntimeProfile::Counter* _check_type_timer = nullptr; + RuntimeProfile::Counter* _execte_read_timer = nullptr; + RuntimeProfile::Counter* _connector_close_timer = nullptr; + private: bool _is_init; diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 8abd4b5e19..251c5dd50f 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -48,6 +48,7 @@ public: friend class VScanner; friend class NewOlapScanner; friend class VFileScanner; + friend class NewJdbcScanner; friend class ScannerContext; Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 090cb6e0b6..b7c2b388be 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -25,10 +25,12 @@ #include "runtime/define_primitive_type.h" #include "runtime/user_function_cache.h" #include "util/jni-util.h" +#include "util/runtime_profile.h" #include "vec/columns/column_array.h" #include "vec/columns/column_nullable.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_string.h" +#include "vec/exec/scan/new_jdbc_scanner.h" #include "vec/functions/simple_function_factory.h" namespace doris { @@ -51,6 +53,12 @@ JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) _conn_param(param), _closed(false) {} +JdbcConnector::JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param) + : TableConnector(param.tuple_desc, param.query_string), + _jdbc_scanner(jdbc_scanner), + _conn_param(param), + _closed(false) {} + JdbcConnector::~JdbcConnector() { if (!_closed) { close(); @@ -63,6 +71,7 @@ JdbcConnector::~JdbcConnector() { #define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz); Status JdbcConnector::close() { + SCOPED_TIMER(_jdbc_scanner->_connector_close_timer); _closed = true; if (!_is_open) { return Status::OK(); @@ -123,10 +132,12 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { if (_conn_param.resource_name.empty()) { // for jdbcExternalTable, _conn_param.resource_name == "" // so, we use _conn_param.driver_path as key of jarpath + SCOPED_TIMER(_jdbc_scanner->_load_jar_timer); RETURN_IF_ERROR(function_cache->get_jarpath( std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path, _conn_param.driver_checksum, &local_location)); } else { + SCOPED_TIMER(_jdbc_scanner->_load_jar_timer); RETURN_IF_ERROR(function_cache->get_jarpath( std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path, _conn_param.driver_checksum, &local_location)); @@ -146,8 +157,10 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { // Pushed frame will be popped when jni_frame goes out-of-scope. RETURN_IF_ERROR(jni_frame.push(env)); RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); - _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes); - + { + SCOPED_TIMER(_jdbc_scanner->_init_connector_timer); + _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes); + } jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr); env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT); env->DeleteLocalRef(ctor_params_bytes); @@ -172,19 +185,23 @@ Status JdbcConnector::query() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); - jint colunm_count = - env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id); - RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); - - if (colunm_count != materialize_num) { - return Status::InternalError("input and output column num not equal of jdbc query."); + { + SCOPED_TIMER(_jdbc_scanner->_execte_read_timer); + jint colunm_count = + env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id); + RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env)); + if (colunm_count != materialize_num) { + return Status::InternalError("input and output column num not equal of jdbc query."); + } } + LOG(INFO) << "JdbcConnector::query has exec success: " << _sql_str; RETURN_IF_ERROR(_check_column_type()); return Status::OK(); } Status JdbcConnector::_check_column_type() { + SCOPED_TIMER(_jdbc_scanner->_check_type_timer); JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); jobject type_lists = @@ -332,6 +349,7 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns if (!_is_open) { return Status::InternalError("get_next before open of jdbc connector."); } + SCOPED_TIMER(_jdbc_scanner->_get_data_timer); JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); jboolean has_next = diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index e1b61fefea..ee99be8ec5 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -28,6 +28,9 @@ namespace doris { namespace vectorized { + +class NewJdbcScanner; + struct JdbcConnectorParam { std::string driver_path; std::string driver_class; @@ -46,6 +49,8 @@ class JdbcConnector : public TableConnector { public: JdbcConnector(const JdbcConnectorParam& param); + JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param); + ~JdbcConnector() override; Status open(RuntimeState* state, bool read = false) override; @@ -84,6 +89,7 @@ private: Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index, int rows); + NewJdbcScanner* _jdbc_scanner; const JdbcConnectorParam& _conn_param; //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER std::map<int, PrimitiveType> _arr_jdbc_map { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org