This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 82251a6bab [refactor] some refactor of delete predicates (#10816) 82251a6bab is described below commit 82251a6bab1b7bcd4c026af4e6fd28dd57613367 Author: yiguolei <676222...@qq.com> AuthorDate: Fri Jul 15 14:13:34 2022 +0800 [refactor] some refactor of delete predicates (#10816) --- be/src/olap/delete_handler.cpp | 22 +- be/src/olap/delete_handler.h | 49 ++- be/src/olap/olap_cond.cpp | 63 --- be/src/olap/olap_cond.h | 6 - be/src/olap/push_handler.cpp | 5 +- be/src/olap/storage_migration_v2.cpp | 473 --------------------- be/src/olap/storage_migration_v2.h | 79 ---- be/src/olap/tablet.h | 4 +- be/src/olap/tablet_meta.cpp | 41 +- be/src/olap/tablet_meta.h | 6 +- .../olap/task/engine_storage_migration_task_v2.cpp | 60 --- .../olap/task/engine_storage_migration_task_v2.h | 44 -- be/test/olap/delete_handler_test.cpp | 186 ++++---- 13 files changed, 150 insertions(+), 888 deletions(-) diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp index 99c6087575..358262965b 100644 --- a/be/src/olap/delete_handler.cpp +++ b/be/src/olap/delete_handler.cpp @@ -48,9 +48,9 @@ using google::protobuf::RepeatedPtrField; namespace doris { -Status DeleteConditionHandler::generate_delete_predicate(const TabletSchema& schema, - const std::vector<TCondition>& conditions, - DeletePredicatePB* del_pred) { +Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema, + const std::vector<TCondition>& conditions, + DeletePredicatePB* del_pred) { if (conditions.empty()) { LOG(WARNING) << "invalid parameters for store_cond." << " condition_size=" << conditions.size(); @@ -89,7 +89,7 @@ Status DeleteConditionHandler::generate_delete_predicate(const TabletSchema& sch return Status::OK(); } -std::string DeleteConditionHandler::construct_sub_predicates(const TCondition& condition) { +std::string DeleteHandler::construct_sub_predicates(const TCondition& condition) { string op = condition.condition_op; if (op == "<") { op += "<"; @@ -110,9 +110,9 @@ std::string DeleteConditionHandler::construct_sub_predicates(const TCondition& c return condition_str; } -bool DeleteConditionHandler::is_condition_value_valid(const TabletColumn& column, - const std::string& condition_op, - const string& value_str) { +bool DeleteHandler::is_condition_value_valid(const TabletColumn& column, + const std::string& condition_op, + const string& value_str) { if ("IS" == condition_op && ("NULL" == value_str || "NOT NULL" == value_str)) { return true; } @@ -162,8 +162,7 @@ bool DeleteConditionHandler::is_condition_value_valid(const TabletColumn& column return false; } -Status DeleteConditionHandler::check_condition_valid(const TabletSchema& schema, - const TCondition& cond) { +Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TCondition& cond) { // Check whether the column exists int32_t field_index = schema.field_index(cond.column_name); if (field_index < 0) { @@ -237,8 +236,9 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio return true; } -Status DeleteHandler::init(const TabletSchema& schema, const DelPredicateArray& delete_conditions, - int64_t version, const TabletReader* reader) { +Status DeleteHandler::init(const TabletSchema& schema, + const std::vector<DeletePredicatePB>& delete_conditions, int64_t version, + const TabletReader* reader) { DCHECK(!_is_inited) << "reinitialize delete handler."; DCHECK(version >= 0) << "invalid parameters. version=" << version; diff --git a/be/src/olap/delete_handler.h b/be/src/olap/delete_handler.h index afd8bae410..c04dbadcb5 100644 --- a/be/src/olap/delete_handler.h +++ b/be/src/olap/delete_handler.h @@ -28,35 +28,11 @@ namespace doris { -using DelPredicateArray = google::protobuf::RepeatedPtrField<DeletePredicatePB>; class Conditions; class RowCursor; class TabletReader; class TabletSchema; -class DeleteConditionHandler { -public: - // generated DeletePredicatePB by TCondition - Status generate_delete_predicate(const TabletSchema& schema, - const std::vector<TCondition>& conditions, - DeletePredicatePB* del_pred); - - // construct sub condition from TCondition - std::string construct_sub_predicates(const TCondition& condition); - -private: - // Validate the condition on the schema. - Status check_condition_valid(const TabletSchema& tablet_schema, const TCondition& cond); - - // Check whether the condition value is valid according to its type. - // 1. For integers(int8,int16,in32,int64,uint8,uint16,uint32,uint64), check whether they are overflow - // 2. For decimal, check whether precision or scale is overflow - // 3. For date and datetime, check format and value - // 4. For char and varchar, check length - bool is_condition_value_valid(const TabletColumn& column, const std::string& condition_op, - const std::string& value_str); -}; - // Represent a delete condition. struct DeleteConditions { int64_t filter_version = 0; // The version of this condition @@ -80,6 +56,29 @@ struct DeleteConditions { // NOTE: // * In the first step, before calling delete_handler.init(), you should lock the tablet's header file. class DeleteHandler { + // These static method is used to generate delete predicate pb during write or push handler +public: + // generated DeletePredicatePB by TCondition + static Status generate_delete_predicate(const TabletSchema& schema, + const std::vector<TCondition>& conditions, + DeletePredicatePB* del_pred); + + // construct sub condition from TCondition + static std::string construct_sub_predicates(const TCondition& condition); + +private: + // Validate the condition on the schema. + static Status check_condition_valid(const TabletSchema& tablet_schema, const TCondition& cond); + + // Check whether the condition value is valid according to its type. + // 1. For integers(int8,int16,in32,int64,uint8,uint16,uint32,uint64), check whether they are overflow + // 2. For decimal, check whether precision or scale is overflow + // 3. For date and datetime, check format and value + // 4. For char and varchar, check length + static bool is_condition_value_valid(const TabletColumn& column, + const std::string& condition_op, + const std::string& value_str); + public: DeleteHandler() = default; ~DeleteHandler() { finalize(); } @@ -94,7 +93,7 @@ public: // return: // * Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS): input parameters are not valid // * Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR): alloc memory failed - Status init(const TabletSchema& schema, const DelPredicateArray& delete_conditions, + Status init(const TabletSchema& schema, const std::vector<DeletePredicatePB>& delete_conditions, int64_t version, const doris::TabletReader* = nullptr); // Check whether a row should be deleted. diff --git a/be/src/olap/olap_cond.cpp b/be/src/olap/olap_cond.cpp index 558ad98bf5..3a478a665d 100644 --- a/be/src/olap/olap_cond.cpp +++ b/be/src/olap/olap_cond.cpp @@ -629,69 +629,6 @@ bool Conditions::delete_conditions_eval(const RowCursor& row) const { return true; } -bool Conditions::rowset_pruning_filter(const std::vector<KeyRange>& zone_maps) const { - // ZoneMap will store min/max of rowset. - // The function is to filter rowset using ZoneMaps - // and query predicates. - for (auto& cond_it : _columns) { - if (_cond_column_is_key_or_duplicate(cond_it.second)) { - if (cond_it.first < zone_maps.size() && - !cond_it.second->eval(zone_maps.at(cond_it.first))) { - return true; - } - } - } - return false; -} - -int Conditions::delete_pruning_filter(const std::vector<KeyRange>& zone_maps) const { - if (_columns.empty()) { - return DEL_NOT_SATISFIED; - } - // ZoneMap and DeletePredicate are all stored in TabletMeta. - // This function is to filter rowset using ZoneMap and Delete Predicate. - /* - * the relationship between condcolumn A and B is A & B. - * if any delete condition is not satisfied, the data can't be filtered. - * elseif all delete condition is satisfied, the data can be filtered. - * else is the partial satisfied. - */ - int ret = DEL_NOT_SATISFIED; - bool del_partial_satisfied = false; - bool del_not_satisfied = false; - for (auto& cond_it : _columns) { - /* - * this is base on the assumption that the delete condition - * is only about key field, not about value field except the storage model is duplicate. - */ - if (_cond_column_is_key_or_duplicate(cond_it.second) && cond_it.first > zone_maps.size()) { - LOG(WARNING) << "where condition not equal column statistics size. " - << "cond_id=" << cond_it.first << ", zone_map_size=" << zone_maps.size(); - del_partial_satisfied = true; - continue; - } - - int del_ret = cond_it.second->del_eval(zone_maps.at(cond_it.first)); - if (DEL_SATISFIED == del_ret) { - continue; - } else if (DEL_PARTIAL_SATISFIED == del_ret) { - del_partial_satisfied = true; - } else { - del_not_satisfied = true; - break; - } - } - - if (del_not_satisfied) { - ret = DEL_NOT_SATISFIED; - } else if (del_partial_satisfied) { - ret = DEL_PARTIAL_SATISFIED; - } else { - ret = DEL_SATISFIED; - } - return ret; -} - CondColumn* Conditions::get_column(int32_t cid) const { auto iter = _columns.find(cid); if (iter != _columns.end()) { diff --git a/be/src/olap/olap_cond.h b/be/src/olap/olap_cond.h index 1282fae0f1..e8bcb7a977 100644 --- a/be/src/olap/olap_cond.h +++ b/be/src/olap/olap_cond.h @@ -175,12 +175,6 @@ public: // Return true means this row should be filtered out, otherwise return false bool delete_conditions_eval(const RowCursor& row) const; - // Return true if the rowset should be pruned - bool rowset_pruning_filter(const std::vector<KeyRange>& zone_maps) const; - - // Whether the rowset satisfied delete condition - int delete_pruning_filter(const std::vector<KeyRange>& zone_maps) const; - const CondColumns& columns() const { return _columns; } CondColumn* get_column(int32_t cid) const; diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 64e936739e..14490763cd 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -115,7 +115,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR } DeletePredicatePB del_pred; - DeleteConditionHandler del_cond_handler; auto tablet_schema = tablet_var.tablet->tablet_schema(); if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) { tablet_schema.clear_columns(); @@ -123,8 +122,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR tablet_schema.append_column(TabletColumn(column_desc)); } } - res = del_cond_handler.generate_delete_predicate(tablet_schema, - request.delete_conditions, &del_pred); + res = DeleteHandler::generate_delete_predicate(tablet_schema, request.delete_conditions, + &del_pred); del_preds.push(del_pred); if (!res.ok()) { LOG(WARNING) << "fail to generate delete condition. res=" << res diff --git a/be/src/olap/storage_migration_v2.cpp b/be/src/olap/storage_migration_v2.cpp deleted file mode 100644 index 2a5373969a..0000000000 --- a/be/src/olap/storage_migration_v2.cpp +++ /dev/null @@ -1,473 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/storage_migration_v2.h" - -#include <pthread.h> -#include <signal.h> - -#include <algorithm> -#include <vector> - -#include "agent/cgroups_mgr.h" -#include "common/resource_tls.h" -#include "env/env_util.h" -#include "olap/merger.h" -#include "olap/row.h" -#include "olap/row_block.h" -#include "olap/row_cursor.h" -#include "olap/rowset/rowset_factory.h" -#include "olap/rowset/rowset_id_generator.h" -#include "olap/storage_engine.h" -#include "olap/tablet.h" -#include "olap/wrapper_field.h" -#include "rapidjson/document.h" -#include "rapidjson/prettywriter.h" -#include "rapidjson/stringbuffer.h" -#include "runtime/exec_env.h" -#include "runtime/thread_context.h" -#include "util/defer_op.h" - -using std::deque; -using std::list; -using std::nothrow; -using std::pair; -using std::string; -using std::stringstream; -using std::vector; - -namespace doris { - -DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(storage_migration_mem_consumption, MetricUnit::BYTES, "", - mem_consumption, Labels({{"type", "storage_migration"}})); - -StorageMigrationV2Handler::StorageMigrationV2Handler() - : _mem_tracker(MemTracker::create_tracker( - -1, "StorageMigrationV2Handler", - StorageEngine::instance()->storage_migration_mem_tracker())) { - REGISTER_HOOK_METRIC(storage_migration_mem_consumption, - [this]() { return _mem_tracker->consumption(); }); -} - -StorageMigrationV2Handler::~StorageMigrationV2Handler() { - DEREGISTER_HOOK_METRIC(storage_migration_mem_consumption); -} - -Status StorageMigrationV2Handler::process_storage_migration_v2( - const TStorageMigrationReqV2& request) { - LOG(INFO) << "begin to do request storage_migration: base_tablet_id=" << request.base_tablet_id - << ", new_tablet_id=" << request.new_tablet_id - << ", migration_version=" << request.migration_version; - - TabletSharedPtr base_tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id); - if (base_tablet == nullptr) { - LOG(WARNING) << "fail to find base tablet. base_tablet=" << request.base_tablet_id; - return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND); - } - // Lock schema_change_lock util schema change info is stored in tablet header - std::unique_lock<std::mutex> schema_change_lock(base_tablet->get_schema_change_lock(), - std::try_to_lock); - if (!schema_change_lock.owns_lock()) { - LOG(WARNING) << "failed to obtain schema change lock. " - << "base_tablet=" << request.base_tablet_id; - return Status::OLAPInternalError(OLAP_ERR_TRY_LOCK_FAILED); - } - - Status res = _do_process_storage_migration_v2(request); - LOG(INFO) << "finished storage_migration process, res=" << res; - return res; -} - -Status StorageMigrationV2Handler::_do_process_storage_migration_v2( - const TStorageMigrationReqV2& request) { - Status res = Status::OK(); - TabletSharedPtr base_tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id); - if (base_tablet == nullptr) { - LOG(WARNING) << "fail to find base tablet. base_tablet=" << request.base_tablet_id; - return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND); - } - - // new tablet has to exist - TabletSharedPtr new_tablet = - StorageEngine::instance()->tablet_manager()->get_tablet(request.new_tablet_id); - if (new_tablet == nullptr) { - LOG(WARNING) << "fail to find new tablet." - << " new_tablet=" << request.new_tablet_id; - return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND); - } - - // check if tablet's state is not_ready, if it is ready, it means the tablet already finished - // check whether the tablet's max continuous version == request.version - if (new_tablet->tablet_state() != TABLET_NOTREADY) { - res = _validate_migration_result(new_tablet, request); - LOG(INFO) << "tablet's state=" << new_tablet->tablet_state() - << " the convert job already finished, check its version" - << " res=" << res; - return res; - } - - LOG(INFO) << "finish to validate storage_migration request. begin to migrate data from base " - "tablet " - "to new tablet" - << " base_tablet=" << base_tablet->full_name() - << " new_tablet=" << new_tablet->full_name(); - - std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock); - if (!base_migration_rlock.owns_lock()) { - return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR); - } - std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock); - if (!new_migration_rlock.owns_lock()) { - return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR); - } - - std::vector<Version> versions_to_be_changed; - std::vector<RowsetReaderSharedPtr> rs_readers; - // delete handlers for new tablet - DeleteHandler delete_handler; - std::vector<ColumnId> return_columns; - - // begin to find deltas to convert from base tablet to new tablet so that - // obtain base tablet and new tablet's push lock and header write lock to prevent loading data - { - std::lock_guard<std::mutex> base_tablet_lock(base_tablet->get_push_lock()); - std::lock_guard<std::mutex> new_tablet_lock(new_tablet->get_push_lock()); - std::lock_guard<std::shared_mutex> base_tablet_wlock(base_tablet->get_header_lock()); - std::lock_guard<std::shared_mutex> new_tablet_wlock(new_tablet->get_header_lock()); - // check if the tablet has alter task - // if it has alter task, it means it is under old alter process - size_t num_cols = base_tablet->tablet_schema().num_columns(); - return_columns.resize(num_cols); - for (int i = 0; i < num_cols; ++i) { - return_columns[i] = i; - } - - // reader_context is stack variables, it's lifetime should keep the same - // with rs_readers - RowsetReaderContext reader_context; - reader_context.reader_type = READER_ALTER_TABLE; - reader_context.tablet_schema = &base_tablet->tablet_schema(); - reader_context.need_ordered_result = true; - reader_context.delete_handler = &delete_handler; - reader_context.return_columns = &return_columns; - // for schema change, seek_columns is the same to return_columns - reader_context.seek_columns = &return_columns; - reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); - reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; - - do { - // get history data to be converted and it will check if there is hold in base tablet - res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed); - if (!res.ok()) { - LOG(WARNING) << "fail to get version to be changed. res=" << res; - break; - } - - // should check the max_version >= request.migration_version, if not the convert is useless - RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version(); - if (max_rowset == nullptr || max_rowset->end_version() < request.migration_version) { - LOG(WARNING) << "base tablet's max version=" - << (max_rowset == nullptr ? 0 : max_rowset->end_version()) - << " is less than request version=" << request.migration_version; - res = Status::OLAPInternalError(OLAP_ERR_VERSION_NOT_EXIST); - break; - } - // before calculating version_to_be_changed, - // remove all data from new tablet, prevent to rewrite data(those double pushed when wait) - LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite." - << " new_tablet=" << new_tablet->full_name(); - std::vector<RowsetSharedPtr> rowsets_to_delete; - std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; - new_tablet->acquire_version_and_rowsets(&version_rowsets); - for (auto& pair : version_rowsets) { - if (pair.first.second <= max_rowset->end_version()) { - rowsets_to_delete.push_back(pair.second); - } - } - std::vector<RowsetSharedPtr> empty_vec; - new_tablet->modify_rowsets(empty_vec, rowsets_to_delete); - // inherit cumulative_layer_point from base_tablet - // check if new_tablet.ce_point > base_tablet.ce_point? - new_tablet->set_cumulative_layer_point(-1); - // save tablet meta - new_tablet->save_meta(); - for (auto& rowset : rowsets_to_delete) { - // do not call rowset.remove directly, using gc thread to delete it - StorageEngine::instance()->add_unused_rowset(rowset); - } - - // init one delete handler - int32_t end_version = -1; - for (auto& version : versions_to_be_changed) { - if (version.second > end_version) { - end_version = version.second; - } - } - - res = delete_handler.init(base_tablet->tablet_schema(), - base_tablet->delete_predicates(), end_version); - if (!res.ok()) { - LOG(WARNING) << "init delete handler failed. base_tablet=" - << base_tablet->full_name() << ", end_version=" << end_version; - - // release delete handlers which have been inited successfully. - delete_handler.finalize(); - break; - } - - // acquire data sources correspond to history versions - base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers); - if (rs_readers.size() < 1) { - LOG(WARNING) << "fail to acquire all data sources. " - << "version_num=" << versions_to_be_changed.size() - << ", data_source_num=" << rs_readers.size(); - res = Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS); - break; - } - - for (auto& rs_reader : rs_readers) { - res = rs_reader->init(&reader_context); - if (!res.ok()) { - LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); - break; - } - } - - } while (0); - } - - do { - if (!res.ok()) { - break; - } - StorageMigrationParams sm_params; - sm_params.base_tablet = base_tablet; - sm_params.new_tablet = new_tablet; - sm_params.ref_rowset_readers = rs_readers; - sm_params.delete_handler = &delete_handler; - - res = _convert_historical_rowsets(sm_params); - if (!res.ok()) { - break; - } - // set state to ready - std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock()); - res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); - if (!res.ok()) { - break; - } - new_tablet->save_meta(); - } while (0); - - if (res.ok()) { - // _validate_migration_result should be outside the above while loop. - // to avoid requiring the header lock twice. - res = _validate_migration_result(new_tablet, request); - } - - // if failed convert history data, then just remove the new tablet - if (!res.ok()) { - LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->full_name() - << ", drop new_tablet=" << new_tablet->full_name(); - // do not drop the new tablet and its data. GC thread will - } - - return res; -} - -Status StorageMigrationV2Handler::_get_versions_to_be_changed( - TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed) { - RowsetSharedPtr rowset = base_tablet->rowset_with_max_version(); - if (rowset == nullptr) { - LOG(WARNING) << "Tablet has no version. base_tablet=" << base_tablet->full_name(); - return Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS); - } - - std::vector<Version> span_versions; - RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second), - &span_versions)); - versions_to_be_changed->insert(versions_to_be_changed->end(), span_versions.begin(), - span_versions.end()); - - return Status::OK(); -} - -Status StorageMigrationV2Handler::_convert_historical_rowsets( - const StorageMigrationParams& sm_params) { - LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet." - << " base_tablet=" << sm_params.base_tablet->full_name() - << ", new_tablet=" << sm_params.new_tablet->full_name(); - - // find end version - int32_t end_version = -1; - for (size_t i = 0; i < sm_params.ref_rowset_readers.size(); ++i) { - if (sm_params.ref_rowset_readers[i]->version().second > end_version) { - end_version = sm_params.ref_rowset_readers[i]->version().second; - } - } - - Status res = Status::OK(); - for (auto& rs_reader : sm_params.ref_rowset_readers) { - VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first - << "-" << rs_reader->version().second; - - TabletSharedPtr new_tablet = sm_params.new_tablet; - - RowsetWriterContext writer_context; - writer_context.rowset_id = StorageEngine::instance()->next_rowset_id(); - writer_context.tablet_uid = new_tablet->tablet_uid(); - writer_context.tablet_id = new_tablet->tablet_id(); - writer_context.partition_id = new_tablet->partition_id(); - writer_context.tablet_schema_hash = new_tablet->schema_hash(); - // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now - writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type(); - if (sm_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { - writer_context.rowset_type = BETA_ROWSET; - } - writer_context.path_desc = new_tablet->tablet_path_desc(); - writer_context.tablet_schema = &(new_tablet->tablet_schema()); - writer_context.rowset_state = VISIBLE; - writer_context.version = rs_reader->version(); - writer_context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); - writer_context.oldest_write_timestamp = rs_reader->oldest_write_timestamp(); - writer_context.newest_write_timestamp = rs_reader->newest_write_timestamp(); - - std::unique_ptr<RowsetWriter> rowset_writer; - Status status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer); - if (!status.ok()) { - res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT); - goto PROCESS_ALTER_EXIT; - } - - if (!(res = _generate_rowset_writer(sm_params.base_tablet->tablet_path_desc(), - sm_params.new_tablet->tablet_path_desc(), rs_reader, - rowset_writer.get(), new_tablet)) - .ok()) { - LOG(WARNING) << "failed to add_rowset. version=" << rs_reader->version().first << "-" - << rs_reader->version().second; - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + - rowset_writer->rowset_id().to_string()); - goto PROCESS_ALTER_EXIT; - } - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + - rowset_writer->rowset_id().to_string()); - // Add the new version of the data to the header - // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table - std::lock_guard<std::mutex> lock(sm_params.new_tablet->get_push_lock()); - RowsetSharedPtr new_rowset = rowset_writer->build(); - if (new_rowset == nullptr) { - LOG(WARNING) << "failed to build rowset, exit alter process"; - goto PROCESS_ALTER_EXIT; - } - res = sm_params.new_tablet->add_rowset(new_rowset); - if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { - LOG(WARNING) << "version already exist, version revert occurred. " - << "tablet=" << sm_params.new_tablet->full_name() << ", version='" - << rs_reader->version().first << "-" << rs_reader->version().second; - StorageEngine::instance()->add_unused_rowset(new_rowset); - res = Status::OK(); - } else if (!res.ok()) { - LOG(WARNING) << "failed to register new version. " - << " tablet=" << sm_params.new_tablet->full_name() - << ", version=" << rs_reader->version().first << "-" - << rs_reader->version().second; - StorageEngine::instance()->add_unused_rowset(new_rowset); - goto PROCESS_ALTER_EXIT; - } else { - VLOG_NOTICE << "register new version. tablet=" << sm_params.new_tablet->full_name() - << ", version=" << rs_reader->version().first << "-" - << rs_reader->version().second; - } - - VLOG_TRACE << "succeed to convert a history version." - << " version=" << rs_reader->version().first << "-" - << rs_reader->version().second; - } -// XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version -PROCESS_ALTER_EXIT : { - // save tablet meta here because rowset meta is not saved during add rowset - std::lock_guard<std::shared_mutex> new_wlock(sm_params.new_tablet->get_header_lock()); - sm_params.new_tablet->save_meta(); -} - if (res.ok()) { - Version test_version(0, end_version); - res = sm_params.new_tablet->check_version_integrity(test_version); - } - - LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. " - << "base_tablet=" << sm_params.base_tablet->full_name() - << ", new_tablet=" << sm_params.new_tablet->full_name(); - return res; -} - -Status StorageMigrationV2Handler::_validate_migration_result( - TabletSharedPtr new_tablet, const TStorageMigrationReqV2& request) { - Version max_continuous_version = {-1, 0}; - new_tablet->max_continuous_version_from_beginning(&max_continuous_version); - LOG(INFO) << "find max continuous version of tablet=" << new_tablet->full_name() - << ", start_version=" << max_continuous_version.first - << ", end_version=" << max_continuous_version.second; - if (max_continuous_version.second < request.migration_version) { - return Status::OLAPInternalError(OLAP_ERR_VERSION_NOT_EXIST); - } - - std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets; - { - std::shared_lock rdlock(new_tablet->get_header_lock(), std::try_to_lock); - new_tablet->acquire_version_and_rowsets(&version_rowsets); - } - for (auto& pair : version_rowsets) { - RowsetSharedPtr rowset = pair.second; - if (!rowset->check_file_exist()) { - return Status::OLAPInternalError(OLAP_ERR_FILE_NOT_EXIST); - } - } - return Status::OK(); -} - -Status StorageMigrationV2Handler::_generate_rowset_writer(const FilePathDesc& src_desc, - const FilePathDesc& dst_desc, - RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet) { - if (!src_desc.is_remote() && dst_desc.is_remote()) { - string remote_file_param_path = dst_desc.filepath + REMOTE_FILE_PARAM; - rapidjson::StringBuffer strbuf; - rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); - writer.StartObject(); - writer.Key(TABLET_UID.c_str()); - writer.String(TabletUid(new_tablet->tablet_uid()).to_string().c_str()); - writer.Key(STORAGE_NAME.c_str()); - writer.String(dst_desc.storage_name.c_str()); - writer.EndObject(); - Status st = env_util::write_string_to_file( - Env::Default(), Slice(std::string(strbuf.GetString())), remote_file_param_path); - // strbuf.GetString() format: {"tablet_uid": "a84cfb67d3ad3d62-87fd8b3ae9bdad84", "storage_name": "s3_name"} - if (!st.ok()) { - LOG(WARNING) << "fail to write tablet_uid and storage_name. path=" - << remote_file_param_path << ", error:" << st.to_string(); - return Status::OLAPInternalError(OLAP_ERR_COPY_FILE_ERROR); - } - LOG(INFO) << "write storage_param successfully: " << remote_file_param_path; - } - - return new_rowset_writer->add_rowset_for_migration(rowset_reader->rowset()); -} - -} // namespace doris diff --git a/be/src/olap/storage_migration_v2.h b/be/src/olap/storage_migration_v2.h deleted file mode 100644 index ae96c225ff..0000000000 --- a/be/src/olap/storage_migration_v2.h +++ /dev/null @@ -1,79 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H -#define DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H - -#include <deque> -#include <functional> -#include <queue> -#include <utility> -#include <vector> - -#include "gen_cpp/AgentService_types.h" -#include "olap/column_mapping.h" -#include "olap/delete_handler.h" -#include "olap/rowset/rowset.h" -#include "olap/rowset/rowset_writer.h" -#include "olap/tablet.h" - -namespace doris { - -class StorageMigrationV2Handler { -public: - static StorageMigrationV2Handler* instance() { - static StorageMigrationV2Handler instance; - return &instance; - } - - // schema change v2, it will not set alter task in base tablet - Status process_storage_migration_v2(const TStorageMigrationReqV2& request); - -private: - Status _get_versions_to_be_changed(TabletSharedPtr base_tablet, - std::vector<Version>* versions_to_be_changed); - - struct StorageMigrationParams { - TabletSharedPtr base_tablet; - TabletSharedPtr new_tablet; - std::vector<RowsetReaderSharedPtr> ref_rowset_readers; - DeleteHandler* delete_handler = nullptr; - }; - - Status _do_process_storage_migration_v2(const TStorageMigrationReqV2& request); - - Status _validate_migration_result(TabletSharedPtr new_tablet, - const TStorageMigrationReqV2& request); - - Status _convert_historical_rowsets(const StorageMigrationParams& sm_params); - - Status _generate_rowset_writer(const FilePathDesc& src_desc, const FilePathDesc& dst_desc, - RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet); - -private: - StorageMigrationV2Handler(); - virtual ~StorageMigrationV2Handler(); - StorageMigrationV2Handler(const StorageMigrationV2Handler&) = delete; - StorageMigrationV2Handler& operator=(const StorageMigrationV2Handler&) = delete; - - std::shared_ptr<MemTracker> _mem_tracker; -}; - -} // namespace doris - -#endif // DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 46b331dc97..3e20c3f230 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -150,7 +150,9 @@ public: Status capture_rs_readers(const std::vector<Version>& version_path, std::vector<RowsetReaderSharedPtr>* rs_readers) const; - DelPredicateArray delete_predicates() { return _tablet_meta->delete_predicates(); } + const std::vector<DeletePredicatePB>& delete_predicates() { + return _tablet_meta->delete_predicates(); + } void add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version); bool version_for_delete_predicate(const Version& version); bool version_for_load_deletion(const Version& version); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index ca3966264b..526ddec54d 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -204,7 +204,7 @@ TabletMeta::TabletMeta(const TabletMeta& b) _schema(b._schema), _rs_metas(b._rs_metas), _stale_rs_metas(b._stale_rs_metas), - _del_pred_array(b._del_pred_array), + _del_predicates(b._del_predicates), _in_restore_mode(b._in_restore_mode), _preferred_rowset_type(b._preferred_rowset_type), _remote_storage_name(b._remote_storage_name), @@ -682,40 +682,35 @@ RowsetMetaSharedPtr TabletMeta::acquire_stale_rs_meta_by_version(const Version& } void TabletMeta::add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version) { - for (auto& del_pred : _del_pred_array) { + for (auto& del_pred : _del_predicates) { if (del_pred.version() == version) { *del_pred.mutable_sub_predicates() = delete_predicate.sub_predicates(); return; } } - DeletePredicatePB* del_pred = _del_pred_array.Add(); - del_pred->set_version(version); - *del_pred->mutable_sub_predicates() = delete_predicate.sub_predicates(); - *del_pred->mutable_in_predicates() = delete_predicate.in_predicates(); + DeletePredicatePB copied_pred = delete_predicate; + copied_pred.set_version(version); + _del_predicates.emplace_back(copied_pred); } void TabletMeta::remove_delete_predicate_by_version(const Version& version) { DCHECK(version.first == version.second) << "version=" << version; - for (int ordinal = 0; ordinal < _del_pred_array.size(); ++ordinal) { - const DeletePredicatePB& temp = _del_pred_array.Get(ordinal); - if (temp.version() == version.first) { - // log delete condition - string del_cond_str; - for (const auto& it : temp.sub_predicates()) { - del_cond_str += it + ";"; - } - VLOG_NOTICE << "remove one del_pred. version=" << temp.version() - << ", condition=" << del_cond_str; - - // remove delete condition from PB - _del_pred_array.SwapElements(ordinal, _del_pred_array.size() - 1); - _del_pred_array.RemoveLast(); + int pred_to_del = -1; + for (int i = 0; i < _del_predicates.size(); ++i) { + if (_del_predicates[i].version() == version.first) { + pred_to_del = i; + // one DeletePredicatePB stands for a nested predicate, such as user submit a delete predicate a=1 and b=2 + // they could be saved as a one DeletePredicatePB + break; } } + if (pred_to_del > -1) { + _del_predicates.erase(_del_predicates.begin() + pred_to_del); + } } -DelPredicateArray TabletMeta::delete_predicates() const { - return _del_pred_array; +const std::vector<DeletePredicatePB>& TabletMeta::delete_predicates() const { + return _del_predicates; } bool TabletMeta::version_for_delete_predicate(const Version& version) { @@ -723,7 +718,7 @@ bool TabletMeta::version_for_delete_predicate(const Version& version) { return false; } - for (auto& del_pred : _del_pred_array) { + for (auto& del_pred : _del_predicates) { if (del_pred.version() == version.first) { return true; } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 6c907dd6be..6e23ec8204 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -168,7 +168,7 @@ public: void add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version); void remove_delete_predicate_by_version(const Version& version); - DelPredicateArray delete_predicates() const; + const std::vector<DeletePredicatePB>& delete_predicates() const; bool version_for_delete_predicate(const Version& version); std::string full_name() const; @@ -211,7 +211,7 @@ public: private: Status _save_meta(DataDir* data_dir); - // _del_pred_array is ignored to compare. + // _del_predicates is ignored to compare. friend bool operator==(const TabletMeta& a, const TabletMeta& b); friend bool operator!=(const TabletMeta& a, const TabletMeta& b); @@ -238,7 +238,7 @@ private: // this policy is judged and computed by TimestampedVersionTracker. std::vector<RowsetMetaSharedPtr> _stale_rs_metas; - DelPredicateArray _del_pred_array; + std::vector<DeletePredicatePB> _del_predicates; bool _in_restore_mode = false; RowsetTypePB _preferred_rowset_type = BETA_ROWSET; std::string _remote_storage_name; diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp b/be/src/olap/task/engine_storage_migration_task_v2.cpp deleted file mode 100644 index 118213b657..0000000000 --- a/be/src/olap/task/engine_storage_migration_task_v2.cpp +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/task/engine_storage_migration_task_v2.h" - -#include "olap/storage_migration_v2.h" -#include "runtime/mem_tracker.h" - -namespace doris { - -using std::to_string; - -EngineStorageMigrationTaskV2::EngineStorageMigrationTaskV2(const TStorageMigrationReqV2& request) - : _storage_migration_req(request) { - _mem_tracker = MemTracker::create_tracker( - config::memory_limitation_per_thread_for_storage_migration_bytes, - fmt::format("EngineStorageMigrationTaskV2#baseTabletId{}:newTabletId{}", - std::to_string(_storage_migration_req.base_tablet_id), - std::to_string(_storage_migration_req.new_tablet_id)), - StorageEngine::instance()->storage_migration_mem_tracker(), MemTrackerLevel::TASK); -} - -Status EngineStorageMigrationTaskV2::execute() { - DorisMetrics::instance()->storage_migrate_v2_requests_total->increment(1); - StorageMigrationV2Handler* storage_migration_v2_handler = StorageMigrationV2Handler::instance(); - Status res = storage_migration_v2_handler->process_storage_migration_v2(_storage_migration_req); - - if (!res.ok()) { - LOG(WARNING) << "failed to do storage migration task. res=" << res - << " base_tablet_id=" << _storage_migration_req.base_tablet_id - << ", base_schema_hash=" << _storage_migration_req.base_schema_hash - << ", new_tablet_id=" << _storage_migration_req.new_tablet_id - << ", new_schema_hash=" << _storage_migration_req.new_schema_hash; - DorisMetrics::instance()->storage_migrate_v2_requests_failed->increment(1); - return res; - } - - LOG(INFO) << "success to create new storage migration v2. res=" << res - << " base_tablet_id=" << _storage_migration_req.base_tablet_id - << ", base_schema_hash=" << _storage_migration_req.base_schema_hash - << ", new_tablet_id=" << _storage_migration_req.new_tablet_id - << ", new_schema_hash=" << _storage_migration_req.new_schema_hash; - return res; -} // execute - -} // namespace doris diff --git a/be/src/olap/task/engine_storage_migration_task_v2.h b/be/src/olap/task/engine_storage_migration_task_v2.h deleted file mode 100644 index 81d57ff636..0000000000 --- a/be/src/olap/task/engine_storage_migration_task_v2.h +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H -#define DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H - -#include "gen_cpp/AgentService_types.h" -#include "olap/olap_define.h" -#include "olap/task/engine_task.h" - -namespace doris { - -// base class for storage engine -// add "Engine" as task prefix to prevent duplicate name with agent task -class EngineStorageMigrationTaskV2 : public EngineTask { -public: - virtual Status execute(); - -public: - EngineStorageMigrationTaskV2(const TStorageMigrationReqV2& request); - ~EngineStorageMigrationTaskV2() {} - -private: - const TStorageMigrationReqV2& _storage_migration_req; - - std::shared_ptr<MemTracker> _mem_tracker; -}; // EngineTask - -} // namespace doris -#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H \ No newline at end of file diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp index b66791df87..b383fbc3a7 100644 --- a/be/test/olap/delete_handler_test.cpp +++ b/be/test/olap/delete_handler_test.cpp @@ -288,7 +288,6 @@ protected: TabletSharedPtr dup_tablet; TCreateTabletReq _create_tablet; TCreateTabletReq _create_dup_tablet; - DeleteConditionHandler _delete_condition_handler; }; TEST_F(TestDeleteConditionHandler, StoreCondSucceed) { @@ -340,8 +339,8 @@ TEST_F(TestDeleteConditionHandler, StoreCondSucceed) { conditions.push_back(condition); DeletePredicatePB del_pred; - success_res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), - conditions, &del_pred); + success_res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred); EXPECT_EQ(Status::OK(), success_res); // 验证存储在header中的过滤条件正确 @@ -364,8 +363,8 @@ TEST_F(TestDeleteConditionHandler, StoreCondInvalidParameters) { // 空的过滤条件 std::vector<TCondition> conditions; DeletePredicatePB del_pred; - Status failed_res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), - conditions, &del_pred); + Status failed_res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), + conditions, &del_pred); ; EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_PARAMETERS), failed_res); } @@ -381,8 +380,8 @@ TEST_F(TestDeleteConditionHandler, StoreCondNonexistentColumn) { condition.condition_values.push_back("2"); conditions.push_back(condition); DeletePredicatePB del_pred; - Status failed_res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), - conditions, &del_pred); + Status failed_res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), + conditions, &del_pred); ; EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), failed_res); @@ -394,8 +393,8 @@ TEST_F(TestDeleteConditionHandler, StoreCondNonexistentColumn) { condition.condition_values.push_back("5"); conditions.push_back(condition); - failed_res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), - conditions, &del_pred); + failed_res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred); ; EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), failed_res); @@ -407,8 +406,8 @@ TEST_F(TestDeleteConditionHandler, StoreCondNonexistentColumn) { condition.condition_values.push_back("5"); conditions.push_back(condition); - Status success_res = _delete_condition_handler.generate_delete_predicate( - dup_tablet->tablet_schema(), conditions, &del_pred); + Status success_res = DeleteHandler::generate_delete_predicate(dup_tablet->tablet_schema(), + conditions, &del_pred); ; EXPECT_EQ(Status::OK(), success_res); } @@ -452,7 +451,6 @@ protected: std::string _tablet_path; TabletSharedPtr tablet; TCreateTabletReq _create_tablet; - DeleteConditionHandler _delete_condition_handler; }; TEST_F(TestDeleteConditionHandler2, ValidConditionValue) { @@ -486,8 +484,7 @@ TEST_F(TestDeleteConditionHandler2, ValidConditionValue) { conditions.push_back(condition); DeletePredicatePB del_pred; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, &del_pred); EXPECT_EQ(Status::OK(), res); // k5类型为int128 @@ -499,8 +496,8 @@ TEST_F(TestDeleteConditionHandler2, ValidConditionValue) { conditions.push_back(condition); DeletePredicatePB del_pred_2; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_2); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_2); EXPECT_EQ(Status::OK(), res); // k9类型为decimal, precision=6, frac=3 @@ -512,29 +509,29 @@ TEST_F(TestDeleteConditionHandler2, ValidConditionValue) { conditions.push_back(condition); DeletePredicatePB del_pred_3; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_3); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_3); EXPECT_EQ(Status::OK(), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2"); DeletePredicatePB del_pred_4; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_4); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_4); EXPECT_EQ(Status::OK(), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-2"); DeletePredicatePB del_pred_5; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_5); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_5); EXPECT_EQ(Status::OK(), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-2.3"); DeletePredicatePB del_pred_6; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_6); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_6); EXPECT_EQ(Status::OK(), res); // k10,k11类型分别为date, datetime @@ -552,8 +549,8 @@ TEST_F(TestDeleteConditionHandler2, ValidConditionValue) { conditions.push_back(condition); DeletePredicatePB del_pred_7; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_7); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_7); EXPECT_EQ(Status::OK(), res); // k12,k13类型分别为string(64), varchar(64) @@ -571,8 +568,8 @@ TEST_F(TestDeleteConditionHandler2, ValidConditionValue) { conditions.push_back(condition); DeletePredicatePB del_pred_8; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_8); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_8); EXPECT_EQ(Status::OK(), res); } @@ -589,16 +586,16 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions.push_back(condition); DeletePredicatePB del_pred_1; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_1); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_1); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k1的值越下界,k1类型为int8 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-1000"); DeletePredicatePB del_pred_2; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_2); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_2); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k2的值越上界,k2类型为int16 @@ -606,16 +603,16 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k2"; conditions[0].condition_values.push_back("32768"); DeletePredicatePB del_pred_3; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_3); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_3); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k2的值越下界,k2类型为int16 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-32769"); DeletePredicatePB del_pred_4; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_4); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_4); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k3的值越上界,k3类型为int32 @@ -623,16 +620,16 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k3"; conditions[0].condition_values.push_back("2147483648"); DeletePredicatePB del_pred_5; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_5); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_5); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k3的值越下界,k3类型为int32 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-2147483649"); DeletePredicatePB del_pred_6; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_6); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_6); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k4的值越上界,k2类型为int64 @@ -640,16 +637,16 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k4"; conditions[0].condition_values.push_back("9223372036854775808"); DeletePredicatePB del_pred_7; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_7); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_7); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k4的值越下界,k1类型为int64 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-9223372036854775809"); DeletePredicatePB del_pred_8; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_8); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_8); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k5的值越上界,k5类型为int128 @@ -657,16 +654,16 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k5"; conditions[0].condition_values.push_back("170141183460469231731687303715884105728"); DeletePredicatePB del_pred_9; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_9); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_9); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k5的值越下界,k5类型为int128 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("-170141183460469231731687303715884105729"); DeletePredicatePB del_pred_10; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_10); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_10); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k9整数部分长度过长,k9类型为decimal, precision=6, frac=3 @@ -674,24 +671,24 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k9"; conditions[0].condition_values.push_back("12347876.5"); DeletePredicatePB del_pred_11; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_11); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_11); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k9小数部分长度过长,k9类型为decimal, precision=6, frac=3 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("1.2345678"); DeletePredicatePB del_pred_12; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_12); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_12); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k9没有小数部分,但包含小数点 conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("1."); DeletePredicatePB del_pred_13; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_13); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_13); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k10类型的过滤值不符合对应格式,k10为date @@ -699,22 +696,22 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k10"; conditions[0].condition_values.push_back("20130101"); DeletePredicatePB del_pred_14; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_14); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_14); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-64-01"); DeletePredicatePB del_pred_15; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_15); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_15); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-01-40"); DeletePredicatePB del_pred_16; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_16); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_16); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k11类型的过滤值不符合对应格式,k11为datetime @@ -722,43 +719,43 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { conditions[0].column_name = "k11"; conditions[0].condition_values.push_back("20130101 00:00:00"); DeletePredicatePB del_pred_17; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_17); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_17); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-64-01 00:00:00"); DeletePredicatePB del_pred_18; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_18); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_18); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-01-40 00:00:00"); DeletePredicatePB del_pred_19; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_19); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_19); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-01-01 24:00:00"); DeletePredicatePB del_pred_20; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_20); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_20); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-01-01 00:60:00"); DeletePredicatePB del_pred_21; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_21); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_21); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); conditions[0].condition_values.push_back("2013-01-01 00:00:60"); DeletePredicatePB del_pred_22; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_22); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_22); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); // 测试k12和k13类型的过滤值过长,k12,k13类型分别为string(64), varchar(64) @@ -769,8 +766,8 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { "FhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYW" "FhYWFhYWFhYWFhYWFhYWFhYWFhYWE=;k13=YWFhYQ=="); DeletePredicatePB del_pred_23; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_23); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_23); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); conditions[0].condition_values.clear(); @@ -780,8 +777,8 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) { "FhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYW" "FhYWFhYWFhYWFhYWFhYWFhYWFhYWE=;k13=YWFhYQ=="); DeletePredicatePB del_pred_24; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_24); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_24); EXPECT_EQ(Status::OLAPInternalError(OLAP_ERR_DELETE_INVALID_CONDITION), res); } @@ -832,7 +829,6 @@ protected: TabletSharedPtr tablet; TCreateTabletReq _create_tablet; DeleteHandler _delete_handler; - DeleteConditionHandler _delete_condition_handler; }; TEST_F(TestDeleteHandler, InitSuccess) { @@ -860,8 +856,7 @@ TEST_F(TestDeleteHandler, InitSuccess) { conditions.push_back(condition); DeletePredicatePB del_pred; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, &del_pred); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred, 1); @@ -873,8 +868,8 @@ TEST_F(TestDeleteHandler, InitSuccess) { conditions.push_back(condition); DeletePredicatePB del_pred_2; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_2); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_2); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred_2, 2); @@ -886,8 +881,8 @@ TEST_F(TestDeleteHandler, InitSuccess) { conditions.push_back(condition); DeletePredicatePB del_pred_3; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_3); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_3); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred_3, 3); @@ -899,8 +894,8 @@ TEST_F(TestDeleteHandler, InitSuccess) { conditions.push_back(condition); DeletePredicatePB del_pred_4; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_4); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_4); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred_4, 4); @@ -941,8 +936,7 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) { conditions.push_back(condition); DeletePredicatePB del_pred; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, &del_pred); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred, 1); @@ -1002,8 +996,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) { conditions.push_back(condition); DeletePredicatePB del_pred; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, &del_pred); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred, 1); @@ -1016,8 +1009,8 @@ TEST_F(TestDeleteHandler, FilterDataConditions) { conditions.push_back(condition); DeletePredicatePB del_pred_2; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_2); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_2); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred_2, 2); @@ -1030,8 +1023,8 @@ TEST_F(TestDeleteHandler, FilterDataConditions) { conditions.push_back(condition); DeletePredicatePB del_pred_3; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_3); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_3); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred_3, 3); @@ -1082,8 +1075,7 @@ TEST_F(TestDeleteHandler, FilterDataVersion) { conditions.push_back(condition); DeletePredicatePB del_pred; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, &del_pred); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred, 3); @@ -1096,8 +1088,8 @@ TEST_F(TestDeleteHandler, FilterDataVersion) { conditions.push_back(condition); DeletePredicatePB del_pred_2; - res = _delete_condition_handler.generate_delete_predicate(tablet->tablet_schema(), conditions, - &del_pred_2); + res = DeleteHandler::generate_delete_predicate(tablet->tablet_schema(), conditions, + &del_pred_2); EXPECT_EQ(Status::OK(), res); tablet->add_delete_predicate(del_pred_2, 4); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org