This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 5805f8077f [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003) 5805f8077f is described below commit 5805f8077f77295f2dd58a3fc7b6660c7fb24262 Author: Pxl <952130...@qq.com> AuthorDate: Thu Jun 16 10:50:08 2022 +0800 [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003) --- be/src/agent/task_worker_pool.cpp | 6 +- be/src/common/config.h | 3 + be/src/olap/push_handler.cpp | 32 +- be/src/olap/push_handler.h | 43 +- be/src/olap/reader.cpp | 30 +- be/src/olap/reader.h | 12 +- be/src/olap/rowset/beta_rowset_reader.h | 3 +- be/src/olap/schema_change.cpp | 493 ++++++++------------- be/src/olap/schema_change.h | 166 ++++--- be/src/olap/tablet.cpp | 3 +- be/src/olap/task/engine_alter_tablet_task.cpp | 9 +- be/src/olap/task/engine_alter_tablet_task.h | 8 +- .../olap/task/engine_storage_migration_task_v2.cpp | 4 +- be/src/olap/tuple_reader.cpp | 6 +- be/src/olap/tuple_reader.h | 15 - be/src/runtime/descriptors.cpp | 15 +- be/src/runtime/descriptors.h | 55 ++- be/src/runtime/runtime_state.cpp | 14 + be/src/runtime/runtime_state.h | 25 +- be/src/runtime/thread_mem_tracker_mgr.h | 3 +- be/src/vec/columns/column.h | 14 +- be/src/vec/olap/block_reader.cpp | 7 +- be/src/vec/olap/block_reader.h | 7 - be/test/olap/schema_change_test.cpp | 6 +- .../java/org/apache/doris/alter/RollupJobV2.java | 23 +- .../org/apache/doris/alter/SchemaChangeJobV2.java | 51 ++- .../java/org/apache/doris/analysis/Analyzer.java | 7 +- .../java/org/apache/doris/analysis/InsertStmt.java | 6 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 6 +- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 6 +- .../apache/doris/load/update/UpdatePlanner.java | 6 +- .../org/apache/doris/task/AlterReplicaTask.java | 28 +- .../doris/planner/StreamLoadScanNodeTest.java | 90 +--- gensrc/thrift/AgentService.thrift | 1 + 34 files changed, 552 insertions(+), 651 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 79953f6709..06ca61eef0 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -535,7 +535,7 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6 if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) { error_msgs.push_back("The data quality does not satisfy, please check your data. "); } - status = Status::DataQualityError("The data quality does not satisfy"); + status = sc_status; } else { status = Status::OK(); } @@ -620,7 +620,7 @@ void TaskWorkerPool::_push_worker_thread_callback() { agent_task_req = _tasks[index]; push_req = agent_task_req.push_req; _tasks.erase(_tasks.begin() + index); - } while (0); + } while (false); if (index < 0) { // there is no high priority task in queue @@ -1764,7 +1764,7 @@ void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& agent_t if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) { error_msgs.push_back("The data quality does not satisfy, please check your data. "); } - status = Status::DataQualityError("The data quality does not satisfy"); + status = sc_status; } else { status = Status::OK(); } diff --git a/be/src/common/config.h b/be/src/common/config.h index 8a922f076e..4885c17193 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -244,6 +244,9 @@ CONF_Bool(enable_low_cardinality_optimize, "true"); CONF_mBool(disable_auto_compaction, "false"); // whether enable vectorized compaction CONF_Bool(enable_vectorized_compaction, "true"); +// whether enable vectorized schema change +CONF_Bool(enable_vectorized_alter_table, "false"); + // check the configuration of auto compaction in seconds when auto compaction disabled CONF_mInt32(check_auto_compaction_interval_seconds, "5"); diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 02e0d46dc9..4ab7ecfddd 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -22,6 +22,7 @@ #include <iostream> #include <sstream> +#include "common/object_pool.h" #include "common/status.h" #include "exec/parquet_scanner.h" #include "olap/row.h" @@ -32,11 +33,6 @@ #include "olap/tablet.h" #include "runtime/exec_env.h" -using std::list; -using std::map; -using std::string; -using std::vector; - namespace doris { // Process push command, the main logical is as follows: @@ -60,6 +56,9 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP Status res = Status::OK(); _request = request; + + DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl); + std::vector<TabletVars> tablet_vars(1); tablet_vars[0].tablet = tablet; res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, tablet_info_vec); @@ -315,16 +314,15 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_ // 5. Convert data for schema change tables VLOG_TRACE << "load to related tables of schema_change if possible."; if (new_tablet != nullptr) { - auto schema_change_handler = SchemaChangeHandler::instance(); - res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset, - new_rowset); + res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset, + new_rowset, *_desc_tbl); if (!res.ok()) { LOG(WARNING) << "failed to change schema version for delta." << "[res=" << res << " new_tablet='" << new_tablet->full_name() << "']"; } } - } while (0); + } while (false); VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name() << ", processed_rows" << num_rows; @@ -456,16 +454,15 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab // 7. Convert data for schema change tables VLOG_TRACE << "load to related tables of schema_change if possible."; if (new_tablet != nullptr) { - auto schema_change_handler = SchemaChangeHandler::instance(); - res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset, - new_rowset); + res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset, + new_rowset, *_desc_tbl); if (!res.ok()) { LOG(WARNING) << "failed to change schema version for delta." << "[res=" << res << " new_tablet='" << new_tablet->full_name() << "']"; } } - } while (0); + } while (false); SAFE_DELETE(reader); VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name() @@ -502,7 +499,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) { return reader; } -BinaryReader::BinaryReader() : IBinaryReader(), _row_buf(nullptr), _row_buf_size(0) {} +BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {} Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) { Status res = Status::OK(); @@ -527,7 +524,7 @@ Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) { _tablet = tablet; _ready = true; - } while (0); + } while (false); if (!res.ok()) { SAFE_DELETE_ARRAY(_row_buf); @@ -637,8 +634,7 @@ Status BinaryReader::next(RowCursor* row) { } LzoBinaryReader::LzoBinaryReader() - : IBinaryReader(), - _row_buf(nullptr), + : _row_buf(nullptr), _row_compressed_buf(nullptr), _row_info_buf(nullptr), _max_row_num(0), @@ -670,7 +666,7 @@ Status LzoBinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) { _tablet = tablet; _ready = true; - } while (0); + } while (false); if (!res.ok()) { SAFE_DELETE_ARRAY(_row_info_buf); diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index 13da3018d4..a290eb01c7 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -45,10 +45,10 @@ struct TabletVars { class PushHandler { public: - typedef std::vector<ColumnMapping> SchemaMapping; + using SchemaMapping = std::vector<ColumnMapping>; - PushHandler() {} - ~PushHandler() {} + PushHandler() = default; + ~PushHandler() = default; // Load local data file into specified tablet. Status process_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request, @@ -80,6 +80,9 @@ private: // mainly tablet_id, version and delta file path TPushReq _request; + ObjectPool _pool; + DescriptorTbl* _desc_tbl = nullptr; + int64_t _write_bytes = 0; int64_t _write_rows = 0; DISALLOW_COPY_AND_ASSIGN(PushHandler); @@ -88,7 +91,7 @@ private: // package FileHandlerWithBuf to read header of dpp output file class BinaryFile : public FileHandlerWithBuf { public: - BinaryFile() {} + BinaryFile() = default; virtual ~BinaryFile() { close(); } Status init(const char* path); @@ -107,7 +110,7 @@ private: class IBinaryReader { public: static IBinaryReader* create(bool need_decompress); - virtual ~IBinaryReader() {} + virtual ~IBinaryReader() = default; virtual Status init(TabletSharedPtr tablet, BinaryFile* file) = 0; virtual Status finalize() = 0; @@ -139,14 +142,14 @@ protected: class BinaryReader : public IBinaryReader { public: explicit BinaryReader(); - virtual ~BinaryReader() { finalize(); } + ~BinaryReader() override { finalize(); } - virtual Status init(TabletSharedPtr tablet, BinaryFile* file); - virtual Status finalize(); + Status init(TabletSharedPtr tablet, BinaryFile* file) override; + Status finalize() override; - virtual Status next(RowCursor* row); + Status next(RowCursor* row) override; - virtual bool eof() { return _curr >= _content_len; } + bool eof() override { return _curr >= _content_len; } private: char* _row_buf; @@ -156,20 +159,20 @@ private: class LzoBinaryReader : public IBinaryReader { public: explicit LzoBinaryReader(); - virtual ~LzoBinaryReader() { finalize(); } + ~LzoBinaryReader() override { finalize(); } - virtual Status init(TabletSharedPtr tablet, BinaryFile* file); - virtual Status finalize(); + Status init(TabletSharedPtr tablet, BinaryFile* file) override; + Status finalize() override; - virtual Status next(RowCursor* row); + Status next(RowCursor* row) override; - virtual bool eof() { return _curr >= _content_len && _row_num == 0; } + bool eof() override { return _curr >= _content_len && _row_num == 0; } private: Status _next_block(); - typedef uint32_t RowNumType; - typedef uint64_t CompressedSizeType; + using RowNumType = uint32_t; + using CompressedSizeType = uint64_t; char* _row_buf; char* _row_compressed_buf; @@ -184,7 +187,7 @@ private: class PushBrokerReader { public: PushBrokerReader() : _ready(false), _eof(false), _fill_tuple(false) {} - ~PushBrokerReader() {} + ~PushBrokerReader() = default; Status init(const Schema* schema, const TBrokerScanRange& t_scan_range, const TDescriptorTable& t_desc_tbl); @@ -195,8 +198,8 @@ public: _ready = false; return Status::OK(); } - bool eof() { return _eof; } - bool is_fill_tuple() { return _fill_tuple; } + bool eof() const { return _eof; } + bool is_fill_tuple() const { return _fill_tuple; } MemPool* mem_pool() { return _mem_pool.get(); } private: diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 0d59b7f969..497c014e46 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -23,23 +23,19 @@ #include <charconv> #include <unordered_set> +#include "common/status.h" #include "olap/bloom_filter_predicate.h" #include "olap/comparison_predicate.h" #include "olap/in_list_predicate.h" #include "olap/null_predicate.h" +#include "olap/olap_common.h" #include "olap/row.h" -#include "olap/row_block.h" #include "olap/row_cursor.h" -#include "olap/rowset/beta_rowset_reader.h" -#include "olap/rowset/column_data.h" #include "olap/schema.h" -#include "olap/storage_engine.h" #include "olap/tablet.h" #include "runtime/mem_pool.h" -#include "runtime/string_value.hpp" #include "util/date_func.h" #include "util/mem_util.hpp" -#include "vec/olap/vcollect_iterator.h" using std::nothrow; using std::set; @@ -313,7 +309,8 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) { } VLOG_NOTICE << "return column is empty, using full column as default."; } else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION || - read_params.reader_type == READER_BASE_COMPACTION) && + read_params.reader_type == READER_BASE_COMPACTION || + read_params.reader_type == READER_ALTER_TABLE) && !read_params.return_columns.empty()) { _return_columns = read_params.return_columns; for (auto id : read_params.return_columns) { @@ -834,12 +831,6 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) { return Status::OK(); } - Status ret; - { - std::shared_lock rdlock(_tablet->get_header_lock()); - ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(), - read_params.version.second, this); - } // Only BASE_COMPACTION need set filter_delete = true // other reader type: // QUERY will filter the row in query layer to keep right result use where clause. @@ -847,7 +838,18 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { if (read_params.reader_type == READER_BASE_COMPACTION) { _filter_delete = true; } - return ret; + + auto delete_init = [&]() -> Status { + return _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(), + read_params.version.second, this); + }; + + if (read_params.reader_type == READER_ALTER_TABLE) { + return delete_init(); + } + + std::shared_lock rdlock(_tablet->get_header_lock()); + return delete_init(); } } // namespace doris diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index f91145d07c..2593a51bae 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -20,20 +20,10 @@ #include <gen_cpp/PaloInternalService_types.h> #include <thrift/protocol/TDebugProtocol.h> -#include <list> -#include <memory> -#include <queue> -#include <sstream> -#include <stack> -#include <string> -#include <utility> -#include <vector> - #include "exprs/bloomfilter_predicate.h" #include "olap/column_predicate.h" #include "olap/delete_handler.h" #include "olap/olap_cond.h" -#include "olap/olap_define.h" #include "olap/row_cursor.h" #include "olap/rowset/rowset_reader.h" #include "olap/tablet.h" @@ -130,7 +120,7 @@ public: uint64_t filtered_rows() const { return _stats.rows_del_filtered + _stats.rows_conditions_filtered + - _stats.rows_vec_del_cond_filtered; + _stats.rows_vec_del_cond_filtered + _stats.rows_vec_cond_filtered; } void set_batch_size(int batch_size) { _batch_size = batch_size; } diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 564980bac8..de1251f13f 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -47,7 +47,8 @@ public: // Return the total number of filtered rows, will be used for validation of schema change int64_t filtered_rows() override { - return _stats->rows_del_filtered + _stats->rows_conditions_filtered; + return _stats->rows_del_filtered + _stats->rows_conditions_filtered + + _stats->rows_vec_del_cond_filtered + _stats->rows_vec_cond_filtered; } RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index f7d86d8d86..68724d29bc 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -17,15 +17,12 @@ #include "olap/schema_change.h" -#include <pthread.h> -#include <signal.h> - -#include <algorithm> #include <vector> -#include "agent/cgroups_mgr.h" -#include "common/resource_tls.h" +#include "common/status.h" +#include "gutil/integral_types.h" #include "olap/merger.h" +#include "olap/olap_common.h" #include "olap/row.h" #include "olap/row_block.h" #include "olap/row_cursor.h" @@ -33,21 +30,23 @@ #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/wrapper_field.h" -#include "runtime/exec_env.h" -#include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" #include "util/defer_op.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/columns/column.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/olap/block_reader.h" -using std::deque; -using std::list; using std::nothrow; -using std::pair; -using std::string; -using std::stringstream; -using std::vector; namespace doris { +constexpr int ALTER_TABLE_BATCH_SIZE = 4096; + class RowBlockSorter { public: explicit RowBlockSorter(RowBlockAllocator* allocator); @@ -92,19 +91,20 @@ private: std::priority_queue<MergeElement> _heap; }; -RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) { +RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl) + : _desc_tbl(desc_tbl) { _schema_mapping.resize(tablet_schema.num_columns()); } RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, - const DeleteHandler* delete_handler) { + const DeleteHandler* delete_handler, DescriptorTbl desc_tbl) + : _desc_tbl(desc_tbl) { _schema_mapping.resize(tablet_schema.num_columns()); _delete_handler = delete_handler; } RowBlockChanger::~RowBlockChanger() { - SchemaMapping::iterator it = _schema_mapping.begin(); - for (; it != _schema_mapping.end(); ++it) { + for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) { SAFE_DELETE(it->default_value); } _schema_mapping.clear(); @@ -212,7 +212,7 @@ public: } private: - typedef std::pair<FieldType, FieldType> convert_type_pair; + using convert_type_pair = std::pair<FieldType, FieldType>; std::unordered_set<convert_type_pair, ConvertTypeMapHash> _convert_type_set; DISALLOW_COPY_AND_ASSIGN(ConvertTypeResolver); @@ -230,7 +230,7 @@ ConvertTypeResolver::ConvertTypeResolver() { add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_DATE>(); // supported type convert should annotate in doc: - // http://doris.incubator.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Definition/ALTER%20TABLE.html#description + // https://doris.apache.org/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.html#alter-table-column // If type convert is supported here, you should check fe/src/main/java/org/apache/doris/catalog/ColumnType.java to supported it either // from varchar type add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_TINYINT>(); @@ -287,7 +287,7 @@ ConvertTypeResolver::ConvertTypeResolver() { add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_DATE>(); } -ConvertTypeResolver::~ConvertTypeResolver() {} +ConvertTypeResolver::~ConvertTypeResolver() = default; bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, int field_idx, int ref_field_idx, MemPool* mem_pool) { @@ -496,7 +496,7 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data if (_schema_mapping[i].ref_column >= 0) { if (!_schema_mapping[i].materialized_function.empty()) { bool (*_do_materialized_transform)(RowCursor*, RowCursor*, const TabletColumn&, int, - int, MemPool*); + int, MemPool*) = nullptr; if (_schema_mapping[i].materialized_function == "to_bitmap") { _do_materialized_transform = to_bitmap; } else if (_schema_mapping[i].materialized_function == "hll_hash") { @@ -545,7 +545,7 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data mutable_block->get_row(new_row_index++, &write_helper); ref_block->get_row(row_index, &read_helper); - if (true == read_helper.is_null(ref_column)) { + if (read_helper.is_null(ref_column)) { write_helper.set_null(i); } else { write_helper.set_not_null(i); @@ -693,7 +693,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) { RowCursor helper_row; auto res = helper_row.init(_swap_row_block->tablet_schema()); - if (!res.ok()) { + if (!res) { LOG(WARNING) << "row cursor init failed.res:" << res; return false; } @@ -807,7 +807,7 @@ bool RowBlockAllocator::is_memory_enough_for_sorting(size_t num_rows, size_t all RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {} -RowBlockMerger::~RowBlockMerger() {} +RowBlockMerger::~RowBlockMerger() = default; bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer, uint64_t* merged_rows) { @@ -815,14 +815,24 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr RowCursor row_cursor; std::unique_ptr<MemPool> mem_pool(new MemPool("RowBlockMerger")); std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool()); + + auto merge_error = [&]() -> bool { + while (!_heap.empty()) { + MergeElement element = _heap.top(); + _heap.pop(); + SAFE_DELETE(element.row_cursor); + } + return false; + }; + if (row_cursor.init(_tablet->tablet_schema()) != Status::OK()) { LOG(WARNING) << "fail to init row cursor."; - goto MERGE_ERR; + return merge_error(); } if (!_make_heap(row_block_arr)) { // There is error log in _make_heap, so no need to more log. - goto MERGE_ERR; + return merge_error(); } row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); @@ -835,7 +845,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr if (KeysType::DUP_KEYS == _tablet->keys_type()) { if (rowset_writer->add_row(row_cursor) != Status::OK()) { LOG(WARNING) << "fail to add row to rowset writer."; - goto MERGE_ERR; + return merge_error(); } continue; } @@ -850,7 +860,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr agg_finalize_row(&row_cursor, mem_pool.get()); if (rowset_writer->add_row(row_cursor) != Status::OK()) { LOG(WARNING) << "fail to add row to rowset writer."; - goto MERGE_ERR; + return merge_error(); } // the memory allocate by mem pool has been copied, @@ -860,20 +870,11 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr } if (rowset_writer->flush() != Status::OK()) { LOG(WARNING) << "failed to finalizing writer."; - goto MERGE_ERR; + return merge_error(); } *merged_rows = tmp_merged_rows; return true; - -MERGE_ERR: - while (_heap.size() > 0) { - MergeElement element = _heap.top(); - _heap.pop(); - SAFE_DELETE(element.row_cursor); - } - - return false; } bool RowBlockMerger::_make_heap(const std::vector<RowBlock*>& row_block_arr) { @@ -914,40 +915,35 @@ void RowBlockMerger::_pop_heap() { element.row_block->get_row(element.row_block_index, element.row_cursor); _heap.push(element); - return; } -Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) { +Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) { // In some cases, there may be more than one type of rowset in a tablet, // in which case the conversion cannot be done directly by linked schema change, // but requires direct schema change to rewrite the data. - if (rowset_reader->type() != new_rowset_writer->type()) { + if (rowset_reader->type() != rowset_writer->type()) { LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id() << " in base tablet " << base_tablet->tablet_id() << " is not same as type " - << new_rowset_writer->type() << ", use direct schema change."; - SchemaChangeDirectly scd(_row_block_changer); - return scd.process(rowset_reader, new_rowset_writer, new_tablet, base_tablet); + << rowset_writer->type() << ", use direct schema change."; + return SchemaChangeHandler::get_sc_procedure(_row_block_changer, false, true) + ->process(rowset_reader, rowset_writer, new_tablet, base_tablet); } else { - Status status = new_rowset_writer->add_rowset_for_linked_schema_change( + Status status = rowset_writer->add_rowset_for_linked_schema_change( rowset_reader->rowset(), _row_block_changer.get_schema_mapping()); - if (!status.ok()) { + if (!status) { LOG(WARNING) << "fail to convert rowset." << ", new_tablet=" << new_tablet->full_name() << ", base_tablet=" << base_tablet->full_name() - << ", version=" << new_rowset_writer->version().first << "-" - << new_rowset_writer->version().second; + << ", version=" << rowset_writer->version().first << "-" + << rowset_writer->version().second; } return status; } } SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer) - : SchemaChange(), - _row_block_changer(row_block_changer), - _row_block_allocator(nullptr), - _cursor(nullptr) {} + : _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {} SchemaChangeDirectly::~SchemaChangeDirectly() { VLOG_NOTICE << "~SchemaChangeDirectly()"; @@ -985,9 +981,9 @@ Status reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_pt return Status::OK(); } -Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) { +Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet) { if (_row_block_allocator == nullptr) { _row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0); if (_row_block_allocator == nullptr) { @@ -1010,16 +1006,6 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, } Status res = Status::OK(); - if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { - res = rowset_writer->flush(); - if (!res.ok()) { - LOG(WARNING) << "create empty version for schema change failed." - << "version=" << rowset_writer->version().first << "-" - << rowset_writer->version().second; - return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR); - } - return Status::OK(); - } VLOG_NOTICE << "init writer. new_tablet=" << new_tablet->full_name() << ", block_row_number=" << new_tablet->num_rows_per_row_block(); @@ -1030,10 +1016,6 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, } }); - // Reset filtered_rows and merged_rows statistic - reset_merged_rows(); - reset_filtered_rows(); - RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); while (ref_row_block != nullptr && ref_row_block->has_remaining()) { @@ -1049,7 +1031,7 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RETURN_NOT_OK_LOG(res, "failed to change data in row block."); // rows filtered by delete handler one by one - add_filtered_rows(filtered_rows); + _add_filtered_rows(filtered_rows); if (!_write_row_block(rowset_writer, new_row_block.get())) { res = Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID); @@ -1065,48 +1047,25 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); } - // rows filtered by zone map against delete handler - add_filtered_rows(rowset_reader->filtered_rows()); - - // Check row num changes - if (config::row_nums_check) { - if (rowset_reader->rowset()->num_rows() != - rowset_writer->num_rows() + merged_rows() + filtered_rows()) { - LOG(WARNING) << "fail to check row num! " - << "source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() - << ", new_index_rows=" << rowset_writer->num_rows(); - res = Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); - } - } - LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() - << ", new_index_rows=" << rowset_writer->num_rows(); return res; } SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation) - : SchemaChange(), - _row_block_changer(row_block_changer), + : _row_block_changer(row_block_changer), _memory_limitation(memory_limitation), - _row_block_allocator(nullptr) { - // Every time SchemaChange is used for external rowing, some temporary versions (such as 999, 1000, 1001) will be written, in order to avoid Cache conflicts, temporary - // The version number takes a BIG NUMBER plus the version number of the current SchemaChange - _temp_delta_versions.first = (1 << 28); - _temp_delta_versions.second = (1 << 28); - // TODO(zyh): remove the magic number -} + _temp_delta_versions(Version::mock()), + _row_block_allocator(nullptr) {} SchemaChangeWithSorting::~SchemaChangeWithSorting() { VLOG_NOTICE << "~SchemaChangeWithSorting()"; SAFE_DELETE(_row_block_allocator); } -Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) { +Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet) { if (_row_block_allocator == nullptr) { _row_block_allocator = new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation); @@ -1119,17 +1078,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, Status res = Status::OK(); RowsetSharedPtr rowset = rowset_reader->rowset(); - if (rowset->empty() || rowset->num_rows() == 0) { - res = new_rowset_writer->flush(); - if (!res.ok()) { - LOG(WARNING) << "create empty version for schema change failed." - << " version=" << new_rowset_writer->version().first << "-" - << new_rowset_writer->version().second; - return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR); - } - return Status::OK(); - } - RowBlockSorter row_block_sorter(_row_block_allocator); // for internal sorting @@ -1155,10 +1103,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, _temp_delta_versions.first = _temp_delta_versions.second; - // Reset filtered_rows and merged_rows statistic - reset_merged_rows(); - reset_filtered_rows(); - SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); @@ -1180,7 +1124,7 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, } if (new_row_block == nullptr) { - if (row_block_arr.size() < 1) { + if (row_block_arr.empty()) { LOG(WARNING) << "Memory limitation is too small for Schema Change." << "memory_limitation=" << _memory_limitation; return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR); @@ -1212,12 +1156,12 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, uint64_t filtered_rows = 0; res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second, new_row_block, &filtered_rows); - if (!res.ok()) { + if (!res) { row_block_arr.push_back(new_row_block); LOG(WARNING) << "failed to change data in row block."; return res; } - add_filtered_rows(filtered_rows); + _add_filtered_rows(filtered_rows); if (new_row_block->row_block_info().row_num > 0) { if (!row_block_sorter.sort(&new_row_block)) { @@ -1260,35 +1204,18 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, } if (src_rowsets.empty()) { - res = new_rowset_writer->flush(); - if (!res.ok()) { + res = rowset_writer->flush(); + if (!res) { LOG(WARNING) << "create empty version for schema change failed." - << " version=" << new_rowset_writer->version().first << "-" - << new_rowset_writer->version().second; + << " version=" << rowset_writer->version().first << "-" + << rowset_writer->version().second; return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); } - } else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) { + } else if (!_external_sorting(src_rowsets, rowset_writer, new_tablet)) { LOG(WARNING) << "failed to sorting externally."; return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); } - add_filtered_rows(rowset_reader->filtered_rows()); - - // Check row num changes - if (config::row_nums_check) { - if (rowset_reader->rowset()->num_rows() != - new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) { - LOG(WARNING) << "fail to check row num!" - << " source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() - << ", new_index_rows=" << new_rowset_writer->num_rows(); - res = Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); - } - } - LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() - << ", new_index_rows=" << new_rowset_writer->num_rows(); return res; } @@ -1315,7 +1242,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro } new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); - add_merged_rows(merged_rows); + _add_merged_rows(merged_rows); *rowset = rowset_writer->build(); return true; } @@ -1327,31 +1254,27 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row for (auto& rowset : src_rowsets) { RowsetReaderSharedPtr rs_reader; auto res = rowset->create_reader(&rs_reader); - if (!res.ok()) { + if (!res) { LOG(WARNING) << "failed to create rowset reader."; return false; } - rs_readers.push_back(std::move(rs_reader)); + rs_readers.push_back(rs_reader); } Merger::Statistics stats; auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer, &stats); - if (!res.ok()) { + if (!res) { LOG(WARNING) << "failed to merge rowsets. tablet=" << new_tablet->full_name() << ", version=" << rowset_writer->version().first << "-" << rowset_writer->version().second; return false; } - add_merged_rows(stats.merged_rows); - add_filtered_rows(stats.filtered_rows); + _add_merged_rows(stats.merged_rows); + _add_filtered_rows(stats.filtered_rows); return true; } -SchemaChangeHandler::SchemaChangeHandler() {} - -SchemaChangeHandler::~SchemaChangeHandler() {} - Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) { LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id << ", new_tablet_id=" << request.new_tablet_id @@ -1377,6 +1300,9 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req return res; } +std::shared_mutex SchemaChangeHandler::_mutex; +std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting; + // In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished // It will cost a lot of time to wait and the task is very difficult to understand. // In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data. @@ -1457,12 +1383,13 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& reader_context.seek_columns = &return_columns; reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; + reader_context.batch_size = ALTER_TABLE_BATCH_SIZE; + reader_context.is_vec = config::enable_vectorized_alter_table; do { RowsetSharedPtr max_rowset; // get history data to be converted and it will check if there is hold in base tablet - res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset); - if (!res.ok()) { + if (!_get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset)) { LOG(WARNING) << "fail to get version to be changed. res=" << res; break; } @@ -1514,27 +1441,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } // init one delete handler - int32_t end_version = -1; + int64_t end_version = -1; for (auto& version : versions_to_be_changed) { - if (version.second > end_version) { - end_version = version.second; - } - } - - res = delete_handler.init(base_tablet->tablet_schema(), - base_tablet->delete_predicates(), end_version); - if (!res.ok()) { - LOG(WARNING) << "init delete handler failed. base_tablet=" - << base_tablet->full_name() << ", end_version=" << end_version; - - // release delete handlers which have been inited successfully. - delete_handler.finalize(); - break; + end_version = std::max(end_version, version.second); } // acquire data sources correspond to history versions base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers); - if (rs_readers.size() < 1) { + if (rs_readers.empty()) { LOG(WARNING) << "fail to acquire all data sources. " << "version_num=" << versions_to_be_changed.size() << ", data_source_num=" << rs_readers.size(); @@ -1542,22 +1456,47 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& break; } + vectorized::BlockReader reader; + TabletReader::ReaderParams reader_params; + reader_params.tablet = base_tablet; + reader_params.reader_type = READER_ALTER_TABLE; + reader_params.rs_readers = rs_readers; + const auto& schema = base_tablet->tablet_schema(); + reader_params.return_columns.resize(schema.num_columns()); + std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0); + reader_params.origin_return_columns = &reader_params.return_columns; + reader_params.version = {0, end_version}; + // BlockReader::init will call base_tablet->get_header_lock(), but this lock we already get at outer layer, so we just call TabletReader::init + RETURN_NOT_OK(reader.TabletReader::init(reader_params)); + + res = delete_handler.init(base_tablet->tablet_schema(), + base_tablet->delete_predicates(), end_version, &reader); + if (!res) { + LOG(WARNING) << "init delete handler failed. base_tablet=" + << base_tablet->full_name() << ", end_version=" << end_version; + + // release delete handlers which have been inited successfully. + delete_handler.finalize(); + break; + } + for (auto& rs_reader : rs_readers) { res = rs_reader->init(&reader_context); - if (!res.ok()) { + if (!res) { LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name(); break; } } - - } while (0); + } while (false); } do { - if (!res.ok()) { + if (!res) { break; } SchemaChangeParams sc_params; + + DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl); sc_params.base_tablet = base_tablet; sc_params.new_tablet = new_tablet; sc_params.ref_rowset_readers = rs_readers; @@ -1588,6 +1527,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& } else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) { mv_param.mv_expr = "count_field"; } + mv_param.expr = std::make_shared<TExpr>(item.mv_expr); } sc_params.materialized_params_map.insert( std::make_pair(item.column_name, mv_param)); @@ -1602,26 +1542,26 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& std::lock_guard<std::shared_mutex> wrlock(_mutex); _tablet_ids_in_converting.erase(new_tablet->tablet_id()); } - if (!res.ok()) { + if (!res) { break; } // set state to ready std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock()); res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); - if (!res.ok()) { + if (!res) { break; } new_tablet->save_meta(); - } while (0); + } while (false); - if (res.ok()) { + if (res) { // _validate_alter_result should be outside the above while loop. // to avoid requiring the header lock twice. res = _validate_alter_result(new_tablet, request); } // if failed convert history data, then just remove the new tablet - if (!res.ok()) { + if (!res) { LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->full_name() << ", drop new_tablet=" << new_tablet->full_name(); // do not drop the new tablet and its data. GC thread will @@ -1638,7 +1578,8 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) { Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowsetSharedPtr* base_rowset, - RowsetSharedPtr* new_rowset) { + RowsetSharedPtr* new_rowset, + DescriptorTbl desc_tbl) { Status res = Status::OK(); LOG(INFO) << "begin to convert delta version for schema changing. " << "base_tablet=" << base_tablet->full_name() @@ -1646,13 +1587,14 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, // a. Parse the Alter request and convert it into an internal representation // Do not use the delete condition specified by the DELETE_DATA command - RowBlockChanger rb_changer(new_tablet->tablet_schema()); + RowBlockChanger rb_changer(new_tablet->tablet_schema(), desc_tbl); bool sc_sorting = false; bool sc_directly = false; const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map; - if (!(res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly, - materialized_function_map))) { + if (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly, + materialized_function_map, desc_tbl); + !res) { LOG(WARNING) << "failed to parse the request. res=" << res; return res; } @@ -1660,24 +1602,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, // NOTE split_table if row_block is used, the original block will become smaller // But since the historical data will become normal after the subsequent base/cumulative, it is also possible to use directly // b. Generate historical data converter - SchemaChange* sc_procedure = nullptr; - if (sc_sorting) { - LOG(INFO) << "doing schema change with sorting for base_tablet " - << base_tablet->full_name(); - sc_procedure = new (nothrow) SchemaChangeWithSorting( - rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes); - } else if (sc_directly) { - LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name(); - sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer); - } else { - LOG(INFO) << "doing linked schema change for base_tablet " << base_tablet->full_name(); - sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer); - } - - if (sc_procedure == nullptr) { - LOG(FATAL) << "failed to malloc SchemaChange. size=" << sizeof(SchemaChangeWithSorting); - return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); - } + auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly); // c. Convert data DeleteHandler delete_handler; @@ -1697,6 +1622,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, reader_context.seek_columns = &return_columns; reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; + reader_context.is_vec = config::enable_vectorized_alter_table; RowsetReaderSharedPtr rowset_reader; RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader)); @@ -1709,8 +1635,19 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, (*base_rowset)->txn_id(), load_id, PREPARED, (*base_rowset)->rowset_meta()->segments_overlap(), &rowset_writer)); - if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, - base_tablet)) != Status::OK()) { + auto schema_version_convert_error = [&]() -> Status { + if (*new_rowset != nullptr) { + StorageEngine::instance()->add_unused_rowset(*new_rowset); + } + + LOG(WARNING) << "failed to convert rowsets. " + << " base_tablet=" << base_tablet->full_name() + << ", new_tablet=" << new_tablet->full_name() << " res = " << res; + return res; + }; + + if (res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, base_tablet); + !res) { if ((*base_rowset)->is_pending()) { LOG(WARNING) << "failed to process the transaction when schema change. " << "tablet=" << new_tablet->full_name() << "'" @@ -1722,7 +1659,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, } new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); - goto SCHEMA_VERSION_CONVERT_ERR; + return schema_version_convert_error(); } *new_rowset = rowset_writer->build(); new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + @@ -1730,25 +1667,13 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, if (*new_rowset == nullptr) { LOG(WARNING) << "build rowset failed."; res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); - goto SCHEMA_VERSION_CONVERT_ERR; + return schema_version_convert_error(); } - SAFE_DELETE(sc_procedure); LOG(INFO) << "successfully convert rowsets. " << " base_tablet=" << base_tablet->full_name() << ", new_tablet=" << new_tablet->full_name(); return res; - -SCHEMA_VERSION_CONVERT_ERR: - if (*new_rowset != nullptr) { - StorageEngine::instance()->add_unused_rowset(*new_rowset); - } - - SAFE_DELETE(sc_procedure); - LOG(WARNING) << "failed to convert rowsets. " - << " base_tablet=" << base_tablet->full_name() - << ", new_tablet=" << new_tablet->full_name() << " res = " << res; - return res; } Status SchemaChangeHandler::_get_versions_to_be_changed( @@ -1782,42 +1707,41 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams // Add filter information in change, and filter column information will be set in _parse_request // And filter some data every time the row block changes - RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler); + RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler, + *sc_params.desc_tbl); bool sc_sorting = false; bool sc_directly = false; - SchemaChange* sc_procedure = nullptr; // a.Parse the Alter request and convert it into an internal representation - Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, - &sc_sorting, &sc_directly, sc_params.materialized_params_map); - if (!res.ok()) { + Status res = + _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, &sc_sorting, + &sc_directly, sc_params.materialized_params_map, *sc_params.desc_tbl); + + auto process_alter_exit = [&]() -> Status { + { + // save tablet meta here because rowset meta is not saved during add rowset + std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock()); + sc_params.new_tablet->save_meta(); + } + if (res) { + Version test_version(0, end_version); + res = sc_params.new_tablet->check_version_integrity(test_version); + } + + LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. " + << "base_tablet=" << sc_params.base_tablet->full_name() + << ", new_tablet=" << sc_params.new_tablet->full_name(); + return res; + }; + + if (!res) { LOG(WARNING) << "failed to parse the request. res=" << res; - goto PROCESS_ALTER_EXIT; + return process_alter_exit(); } // b. Generate historical data converter - if (sc_sorting) { - LOG(INFO) << "doing schema change with sorting for base_tablet " - << sc_params.base_tablet->full_name(); - sc_procedure = new (nothrow) SchemaChangeWithSorting( - rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes); - } else if (sc_directly) { - LOG(INFO) << "doing schema change directly for base_tablet " - << sc_params.base_tablet->full_name(); - sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer); - } else { - LOG(INFO) << "doing linked schema change for base_tablet " - << sc_params.base_tablet->full_name(); - sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer); - } - - if (sc_procedure == nullptr) { - LOG(WARNING) << "failed to malloc SchemaChange. " - << "malloc_size=" << sizeof(SchemaChangeWithSorting); - res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR); - goto PROCESS_ALTER_EXIT; - } + auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly); // c.Convert historical data for (auto& rs_reader : sc_params.ref_rowset_readers) { @@ -1834,19 +1758,20 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams Status status = new_tablet->create_rowset_writer( rs_reader->version(), VISIBLE, rs_reader->rowset()->rowset_meta()->segments_overlap(), &rowset_writer); - if (!status.ok()) { + if (!status) { res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT); - goto PROCESS_ALTER_EXIT; + return process_alter_exit(); } - if ((res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, - sc_params.base_tablet)) != Status::OK()) { + if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, + sc_params.base_tablet); + !res) { LOG(WARNING) << "failed to process the version." << " version=" << rs_reader->version().first << "-" << rs_reader->version().second; new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); - goto PROCESS_ALTER_EXIT; + return process_alter_exit(); } new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); @@ -1856,7 +1781,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams RowsetSharedPtr new_rowset = rowset_writer->build(); if (new_rowset == nullptr) { LOG(WARNING) << "failed to build rowset, exit alter process"; - goto PROCESS_ALTER_EXIT; + return process_alter_exit(); } res = sc_params.new_tablet->add_rowset(new_rowset); if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { @@ -1865,13 +1790,13 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams << rs_reader->version().first << "-" << rs_reader->version().second; StorageEngine::instance()->add_unused_rowset(new_rowset); res = Status::OK(); - } else if (!res.ok()) { + } else if (!res) { LOG(WARNING) << "failed to register new version. " << " tablet=" << sc_params.new_tablet->full_name() << ", version=" << rs_reader->version().first << "-" << rs_reader->version().second; StorageEngine::instance()->add_unused_rowset(new_rowset); - goto PROCESS_ALTER_EXIT; + return process_alter_exit(); } else { VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name() << ", version=" << rs_reader->version().first << "-" @@ -1882,22 +1807,9 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams << " version=" << rs_reader->version().first << "-" << rs_reader->version().second; } - // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version -PROCESS_ALTER_EXIT : { - // save tablet meta here because rowset meta is not saved during add rowset - std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock()); - sc_params.new_tablet->save_meta(); -} - if (res.ok()) { - Version test_version(0, end_version); - res = sc_params.new_tablet->check_version_integrity(test_version); - } - SAFE_DELETE(sc_procedure); - LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. " - << "base_tablet=" << sc_params.base_tablet->full_name() - << ", new_tablet=" << sc_params.new_tablet->full_name(); - return res; + // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version + return process_alter_exit(); } // @static @@ -1906,9 +1818,8 @@ Status SchemaChangeHandler::_parse_request( TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly, const std::unordered_map<std::string, AlterMaterializedViewParam>& - materialized_function_map) { - Status res = Status::OK(); - + materialized_function_map, + DescriptorTbl desc_tbl) { // set column mapping for (int i = 0, new_schema_size = new_tablet->tablet_schema().num_columns(); i < new_schema_size; ++i) { @@ -1933,11 +1844,10 @@ Status SchemaChangeHandler::_parse_request( } if (materialized_function_map.find(column_name) != materialized_function_map.end()) { - AlterMaterializedViewParam mvParam = - materialized_function_map.find(column_name)->second; + auto mvParam = materialized_function_map.find(column_name)->second; column_mapping->materialized_function = mvParam.mv_expr; - std::string origin_column_name = mvParam.origin_column_name; - int32_t column_index = base_tablet->field_index(origin_column_name); + column_mapping->expr = mvParam.expr; + int32_t column_index = base_tablet->field_index(mvParam.origin_column_name); if (column_index >= 0) { column_mapping->ref_column = column_index; continue; @@ -1961,10 +1871,8 @@ Status SchemaChangeHandler::_parse_request( if (i < base_tablet->num_short_key_columns()) { *sc_directly = true; } - res = _init_column_mapping(column_mapping, new_column, new_column.default_value()); - if (!res) { - return res; - } + RETURN_IF_ERROR( + _init_column_mapping(column_mapping, new_column, new_column.default_value())); VLOG_TRACE << "A column with default value will be added after schema changing. " << "column=" << column_name << ", default_value=" << new_column.default_value(); @@ -2000,7 +1908,7 @@ Status SchemaChangeHandler::_parse_request( // If the sort of key has not been changed but the new keys num is less then base's, // the new table should be re agg. - // So we also need to set sc_sorting = true. + // So we also need to set sc_sorting = true. // A, B, C are keys(sort keys), D is value // followings need resort: // old keys: A B C D @@ -2025,23 +1933,12 @@ Status SchemaChangeHandler::_parse_request( if (column_mapping->ref_column < 0) { continue; } else { - if (new_tablet_schema.column(i).type() != - ref_tablet_schema.column(column_mapping->ref_column).type()) { - *sc_directly = true; - return Status::OK(); - } else if ((new_tablet_schema.column(i).type() == - ref_tablet_schema.column(column_mapping->ref_column).type()) && - (new_tablet_schema.column(i).length() != - ref_tablet_schema.column(column_mapping->ref_column).length())) { - *sc_directly = true; - return Status::OK(); - - } else if (new_tablet_schema.column(i).is_bf_column() != - ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) { - *sc_directly = true; - return Status::OK(); - } else if (new_tablet_schema.column(i).has_bitmap_index() != - ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) { + auto column_new = new_tablet_schema.column(i); + auto column_old = ref_tablet_schema.column(column_mapping->ref_column); + if (column_new.type() != column_old.type() || + column_new.length() != column_old.length() || + column_new.is_bf_column() != column_old.is_bf_column() || + column_new.has_bitmap_index() != column_old.has_bitmap_index()) { *sc_directly = true; return Status::OK(); } @@ -2049,7 +1946,7 @@ Status SchemaChangeHandler::_parse_request( } if (base_tablet->delete_predicates().size() != 0) { - //there exists delete condition in header, can't do linked schema change + // there exists delete condition in header, can't do linked schema change *sc_directly = true; } diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 5c8757613d..2f820ae79e 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -17,29 +17,17 @@ #pragma once -#include <deque> -#include <functional> -#include <queue> -#include <utility> -#include <vector> - +#include "common/status.h" #include "gen_cpp/AgentService_types.h" #include "olap/column_mapping.h" #include "olap/delete_handler.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" +#include "vec/columns/column.h" +#include "vec/core/block.h" namespace doris { -// defined in 'field.h' -class Field; -class FieldInfo; -// defined in 'tablet.h' -class Tablet; -// defined in 'row_block.h' -class RowBlock; -// defined in 'row_cursor.h' -class RowCursor; bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column, int field_idx, int ref_field_idx, MemPool* mem_pool); @@ -50,11 +38,12 @@ bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletCo class RowBlockChanger { public: - RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler); + RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler, + DescriptorTbl desc_tbl); - RowBlockChanger(const TabletSchema& tablet_schema); + RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl); - virtual ~RowBlockChanger(); + ~RowBlockChanger(); ColumnMapping* get_mutable_column_mapping(size_t column_index); @@ -70,6 +59,8 @@ private: // delete handler for filtering data which use specified in DELETE_DATA const DeleteHandler* _delete_handler = nullptr; + DescriptorTbl _desc_tbl; + DISALLOW_COPY_AND_ASSIGN(RowBlockChanger); }; @@ -94,20 +85,60 @@ public: SchemaChange() : _filtered_rows(0), _merged_rows(0) {} virtual ~SchemaChange() = default; - virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder, - TabletSharedPtr tablet, TabletSharedPtr base_tablet) = 0; - - void add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; } - - void add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; } + virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) { + if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) { + RETURN_WITH_WARN_IF_ERROR( + rowset_writer->flush(), + Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR), + fmt::format("create empty version for schema change failed. version= {}-{}", + rowset_writer->version().first, rowset_writer->version().second)); + + return Status::OK(); + } + + _filtered_rows = 0; + _merged_rows = 0; + + RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet)); + _add_filtered_rows(rowset_reader->filtered_rows()); + + // Check row num changes + if (config::row_nums_check && !_check_row_nums(rowset_reader, *rowset_writer)) { + return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); + } + + LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() + << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() + << ", new_index_rows=" << rowset_writer->num_rows(); + return Status::OK(); + } uint64_t filtered_rows() const { return _filtered_rows; } uint64_t merged_rows() const { return _merged_rows; } - void reset_filtered_rows() { _filtered_rows = 0; } +protected: + void _add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; } - void reset_merged_rows() { _merged_rows = 0; } + void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; } + + virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) { + return Status::NotSupported("inner process unsupported."); + }; + + bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const { + if (reader->rowset()->num_rows() != writer.num_rows() + _merged_rows + _filtered_rows) { + LOG(WARNING) << "fail to check row num! " + << "source_rows=" << reader->rowset()->num_rows() + << ", merged_rows=" << merged_rows() + << ", filtered_rows=" << filtered_rows() + << ", new_index_rows=" << writer.num_rows(); + return false; + } + return true; + } private: uint64_t _filtered_rows; @@ -117,11 +148,11 @@ private: class LinkedSchemaChange : public SchemaChange { public: explicit LinkedSchemaChange(const RowBlockChanger& row_block_changer) - : SchemaChange(), _row_block_changer(row_block_changer) {} - ~LinkedSchemaChange() {} + : _row_block_changer(row_block_changer) {} + ~LinkedSchemaChange() override = default; - virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; private: const RowBlockChanger& _row_block_changer; @@ -134,12 +165,12 @@ public: // @params tablet the instance of tablet which has new schema. // @params row_block_changer changer to modify the data of RowBlock explicit SchemaChangeDirectly(const RowBlockChanger& row_block_changer); - virtual ~SchemaChangeDirectly(); - - virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + ~SchemaChangeDirectly() override; private: + Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + const RowBlockChanger& _row_block_changer; RowBlockAllocator* _row_block_allocator; RowCursor* _cursor; @@ -154,12 +185,12 @@ class SchemaChangeWithSorting : public SchemaChange { public: explicit SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation); - virtual ~SchemaChangeWithSorting(); - - virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder, - TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + ~SchemaChangeWithSorting() override; private: + Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr, const Version& temp_delta_versions, TabletSharedPtr new_tablet, SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset); @@ -177,18 +208,26 @@ private: class SchemaChangeHandler { public: - static SchemaChangeHandler* instance() { - static SchemaChangeHandler instance; - return &instance; - } - - Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, - RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset); + static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, + RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset, + DescriptorTbl desc_tbl); // schema change v2, it will not set alter task in base tablet - Status process_alter_tablet_v2(const TAlterTabletReqV2& request); + static Status process_alter_tablet_v2(const TAlterTabletReqV2& request); + + static std::unique_ptr<SchemaChange> get_sc_procedure(const RowBlockChanger& rb_changer, + bool sc_sorting, bool sc_directly) { + if (sc_sorting) { + return std::make_unique<SchemaChangeWithSorting>( + rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes); + } + if (sc_directly) { + return std::make_unique<SchemaChangeDirectly>(rb_changer); + } + return std::make_unique<LinkedSchemaChange>(rb_changer); + } - bool tablet_in_converting(int64_t tablet_id); + static bool tablet_in_converting(int64_t tablet_id); private: // Check the status of schema change and clear information between "a pair" of Schema change tables @@ -196,17 +235,18 @@ private: // Returns: // Success: If there is historical information, then clear it if there is no problem; or no historical information // Failure: otherwise, if there is history information and it cannot be emptied (version has not been completed) - Status _check_and_clear_schema_change_info(TabletSharedPtr tablet, - const TAlterTabletReq& request); + static Status _check_and_clear_schema_change_info(TabletSharedPtr tablet, + const TAlterTabletReq& request); - Status _get_versions_to_be_changed(TabletSharedPtr base_tablet, - std::vector<Version>* versions_to_be_changed, - RowsetSharedPtr* max_rowset); + static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet, + std::vector<Version>* versions_to_be_changed, + RowsetSharedPtr* max_rowset); struct AlterMaterializedViewParam { std::string column_name; std::string origin_column_name; std::string mv_expr; + std::shared_ptr<TExpr> expr; }; struct SchemaChangeParams { @@ -216,31 +256,29 @@ private: std::vector<RowsetReaderSharedPtr> ref_rowset_readers; DeleteHandler* delete_handler = nullptr; std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map; + DescriptorTbl* desc_tbl = nullptr; + ObjectPool pool; }; - Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request); + static Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request); - Status _validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request); + static Status _validate_alter_result(TabletSharedPtr new_tablet, + const TAlterTabletReqV2& request); - Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); + static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); static Status _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly, const std::unordered_map<std::string, AlterMaterializedViewParam>& - materialized_function_map); + materialized_function_map, + DescriptorTbl desc_tbl); // Initialization Settings for creating a default value static Status _init_column_mapping(ColumnMapping* column_mapping, const TabletColumn& column_schema, const std::string& value); -private: - SchemaChangeHandler(); - virtual ~SchemaChangeHandler(); - SchemaChangeHandler(const SchemaChangeHandler&) = delete; - SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete; - - std::shared_mutex _mutex; - std::unordered_set<int64_t> _tablet_ids_in_converting; + static std::shared_mutex _mutex; + static std::unordered_set<int64_t> _tablet_ids_in_converting; }; using RowBlockDeleter = std::function<void(RowBlock*)>; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 85d8f7e4ad..771069e4ce 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -732,8 +732,7 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type) // Before doing schema change, tablet's rowsets that versions smaller than max converting version will be // removed. So, we only need to do the compaction when it is being converted. // After being converted, tablet's state will be changed to TABLET_RUNNING. - auto schema_change_handler = SchemaChangeHandler::instance(); - return schema_change_handler->tablet_in_converting(tablet_id()); + return SchemaChangeHandler::tablet_in_converting(tablet_id()); } return true; diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 24496822d3..55ba6ab6e9 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -23,8 +23,6 @@ namespace doris { -using std::to_string; - EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { _mem_tracker = MemTracker::create_tracker( @@ -39,8 +37,7 @@ Status EngineAlterTabletTask::execute() { SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker); DorisMetrics::instance()->create_rollup_requests_total->increment(1); - auto schema_change_handler = SchemaChangeHandler::instance(); - Status res = schema_change_handler->process_alter_tablet_v2(_alter_tablet_req); + Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req); if (!res.ok()) { LOG(WARNING) << "failed to do alter task. res=" << res @@ -53,8 +50,8 @@ Status EngineAlterTabletTask::execute() { } LOG(INFO) << "success to create new alter tablet. res=" << res - << " base_tablet_id=" << _alter_tablet_req.base_tablet_id << ", base_schema_hash" - << _alter_tablet_req.base_schema_hash + << " base_tablet_id=" << _alter_tablet_req.base_tablet_id + << ", base_schema_hash=" << _alter_tablet_req.base_schema_hash << ", new_tablet_id=" << _alter_tablet_req.new_tablet_id << ", new_schema_hash=" << _alter_tablet_req.new_schema_hash; return res; diff --git a/be/src/olap/task/engine_alter_tablet_task.h b/be/src/olap/task/engine_alter_tablet_task.h index 1a2c0b3efa..7cc97395f1 100644 --- a/be/src/olap/task/engine_alter_tablet_task.h +++ b/be/src/olap/task/engine_alter_tablet_task.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H -#define DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H +#pragma once #include "gen_cpp/AgentService_types.h" #include "olap/olap_define.h" @@ -28,11 +27,11 @@ namespace doris { // add "Engine" as task prefix to prevent duplicate name with agent task class EngineAlterTabletTask : public EngineTask { public: - virtual Status execute(); + Status execute() override; public: EngineAlterTabletTask(const TAlterTabletReqV2& alter_tablet_request); - ~EngineAlterTabletTask() {} + ~EngineAlterTabletTask() = default; private: const TAlterTabletReqV2& _alter_tablet_req; @@ -41,4 +40,3 @@ private: }; // EngineTask } // namespace doris -#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H \ No newline at end of file diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp b/be/src/olap/task/engine_storage_migration_task_v2.cpp index fe00536662..118213b657 100644 --- a/be/src/olap/task/engine_storage_migration_task_v2.cpp +++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp @@ -50,8 +50,8 @@ Status EngineStorageMigrationTaskV2::execute() { } LOG(INFO) << "success to create new storage migration v2. res=" << res - << " base_tablet_id=" << _storage_migration_req.base_tablet_id << ", base_schema_hash" - << _storage_migration_req.base_schema_hash + << " base_tablet_id=" << _storage_migration_req.base_tablet_id + << ", base_schema_hash=" << _storage_migration_req.base_schema_hash << ", new_tablet_id=" << _storage_migration_req.new_tablet_id << ", new_schema_hash=" << _storage_migration_req.new_schema_hash; return res; diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp index c7a9a2188c..b7a50b6808 100644 --- a/be/src/olap/tuple_reader.cpp +++ b/be/src/olap/tuple_reader.cpp @@ -23,14 +23,10 @@ #include <unordered_set> #include "olap/collect_iterator.h" +#include "olap/olap_common.h" #include "olap/row.h" -#include "olap/row_block.h" #include "olap/row_cursor.h" -#include "olap/rowset/beta_rowset_reader.h" -#include "olap/schema.h" -#include "olap/storage_engine.h" #include "runtime/mem_pool.h" -#include "util/date_func.h" using std::nothrow; using std::set; diff --git a/be/src/olap/tuple_reader.h b/be/src/olap/tuple_reader.h index 844135a327..7045393e1b 100644 --- a/be/src/olap/tuple_reader.h +++ b/be/src/olap/tuple_reader.h @@ -20,26 +20,11 @@ #include <gen_cpp/PaloInternalService_types.h> #include <thrift/protocol/TDebugProtocol.h> -#include <list> -#include <memory> -#include <queue> -#include <sstream> -#include <stack> -#include <string> -#include <utility> -#include <vector> - -#include "exprs/bloomfilter_predicate.h" #include "olap/collect_iterator.h" -#include "olap/column_predicate.h" #include "olap/delete_handler.h" -#include "olap/olap_cond.h" -#include "olap/olap_define.h" #include "olap/reader.h" #include "olap/row_cursor.h" #include "olap/rowset/rowset_reader.h" -#include "olap/tablet.h" -#include "util/runtime_profile.h" namespace doris { diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index aa762048b7..4b81fc2cfe 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -319,7 +319,9 @@ std::string TupleDescriptor::debug_string() const { RowDescriptor::RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples, const std::vector<bool>& nullable_tuples) : _tuple_idx_nullable_map(nullable_tuples) { - DCHECK(nullable_tuples.size() == row_tuples.size()); + DCHECK(nullable_tuples.size() == row_tuples.size()) + << "nullable_tuples size " << nullable_tuples.size() << " != row_tuples size " + << row_tuples.size(); DCHECK_GT(row_tuples.size(), 0); _num_materialized_slots = 0; _num_null_slots = 0; @@ -570,6 +572,7 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb } (*tbl)->_tuple_desc_map[tdesc.id] = desc; + (*tbl)->_row_tuples.emplace_back(tdesc.id); } for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) { @@ -622,16 +625,6 @@ SlotDescriptor* DescriptorTbl::get_slot_descriptor(SlotId id) const { } } -// return all registered tuple descriptors -void DescriptorTbl::get_tuple_descs(std::vector<TupleDescriptor*>* descs) const { - descs->clear(); - - for (TupleDescriptorMap::const_iterator i = _tuple_desc_map.begin(); i != _tuple_desc_map.end(); - ++i) { - descs->push_back(i->second); - } -} - bool SlotDescriptor::layout_equals(const SlotDescriptor& other_desc) const { if (type().type != other_desc.type().type) return false; if (is_nullable() != other_desc.is_nullable()) return false; diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index a806a537c5..ee18f8a450 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -149,7 +149,7 @@ private: class TableDescriptor { public: TableDescriptor(const TTableDescriptor& tdesc); - virtual ~TableDescriptor() {} + virtual ~TableDescriptor() = default; int num_cols() const { return _num_cols; } int num_clustering_cols() const { return _num_clustering_cols; } virtual std::string debug_string() const; @@ -173,14 +173,14 @@ private: class OlapTableDescriptor : public TableDescriptor { public: OlapTableDescriptor(const TTableDescriptor& tdesc); - virtual std::string debug_string() const; + std::string debug_string() const override; }; class SchemaTableDescriptor : public TableDescriptor { public: SchemaTableDescriptor(const TTableDescriptor& tdesc); - virtual ~SchemaTableDescriptor(); - virtual std::string debug_string() const; + ~SchemaTableDescriptor() override; + std::string debug_string() const override; TSchemaTableType::type schema_table_type() const { return _schema_table_type; } private: @@ -190,8 +190,8 @@ private: class BrokerTableDescriptor : public TableDescriptor { public: BrokerTableDescriptor(const TTableDescriptor& tdesc); - virtual ~BrokerTableDescriptor(); - virtual std::string debug_string() const; + ~BrokerTableDescriptor() override; + std::string debug_string() const override; private: }; @@ -199,8 +199,8 @@ private: class HiveTableDescriptor : public TableDescriptor { public: HiveTableDescriptor(const TTableDescriptor& tdesc); - virtual ~HiveTableDescriptor(); - virtual std::string debug_string() const; + ~HiveTableDescriptor() override; + std::string debug_string() const override; private: }; @@ -208,8 +208,8 @@ private: class IcebergTableDescriptor : public TableDescriptor { public: IcebergTableDescriptor(const TTableDescriptor& tdesc); - virtual ~IcebergTableDescriptor(); - virtual std::string debug_string() const; + ~IcebergTableDescriptor() override; + std::string debug_string() const override; private: }; @@ -217,8 +217,8 @@ private: class EsTableDescriptor : public TableDescriptor { public: EsTableDescriptor(const TTableDescriptor& tdesc); - virtual ~EsTableDescriptor(); - virtual std::string debug_string() const; + ~EsTableDescriptor() override; + std::string debug_string() const override; private: }; @@ -226,7 +226,7 @@ private: class MySQLTableDescriptor : public TableDescriptor { public: MySQLTableDescriptor(const TTableDescriptor& tdesc); - virtual std::string debug_string() const; + std::string debug_string() const override; const std::string mysql_db() const { return _mysql_db; } const std::string mysql_table() const { return _mysql_table; } const std::string host() const { return _host; } @@ -248,7 +248,7 @@ private: class ODBCTableDescriptor : public TableDescriptor { public: ODBCTableDescriptor(const TTableDescriptor& tdesc); - virtual std::string debug_string() const; + std::string debug_string() const override; const std::string db() const { return _db; } const std::string table() const { return _table; } const std::string host() const { return _host; } @@ -348,22 +348,32 @@ public: TableDescriptor* get_table_descriptor(TableId id) const; TupleDescriptor* get_tuple_descriptor(TupleId id) const; SlotDescriptor* get_slot_descriptor(SlotId id) const; + const std::vector<TTupleId>& get_row_tuples() const { return _row_tuples; } // return all registered tuple descriptors - void get_tuple_descs(std::vector<TupleDescriptor*>* descs) const; + std::vector<TupleDescriptor*> get_tuple_descs() const { + std::vector<TupleDescriptor*> descs; + + for (auto it : _tuple_desc_map) { + descs.push_back(it.second); + } + + return descs; + } std::string debug_string() const; private: - typedef std::unordered_map<TableId, TableDescriptor*> TableDescriptorMap; - typedef std::unordered_map<TupleId, TupleDescriptor*> TupleDescriptorMap; - typedef std::unordered_map<SlotId, SlotDescriptor*> SlotDescriptorMap; + using TableDescriptorMap = std::unordered_map<TableId, TableDescriptor*>; + using TupleDescriptorMap = std::unordered_map<TupleId, TupleDescriptor*>; + using SlotDescriptorMap = std::unordered_map<SlotId, SlotDescriptor*>; TableDescriptorMap _tbl_desc_map; TupleDescriptorMap _tuple_desc_map; SlotDescriptorMap _slot_desc_map; + std::vector<TTupleId> _row_tuples; - DescriptorTbl() : _tbl_desc_map(), _tuple_desc_map(), _slot_desc_map() {} + DescriptorTbl() = default; }; // Records positions of tuples within row produced by ExecNode. @@ -378,6 +388,11 @@ public: RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples, const std::vector<bool>& nullable_tuples); + static RowDescriptor create_default(const DescriptorTbl& desc_tbl, + const std::vector<bool>& nullable_tuples) { + return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples); + } + // standard copy c'tor, made explicit here RowDescriptor(const RowDescriptor& desc) : _tuple_desc_map(desc._tuple_desc_map), @@ -399,7 +414,7 @@ public: RowDescriptor(const RowDescriptor& lhs_row_desc, const RowDescriptor& rhs_row_desc); // dummy descriptor, needed for the JNI EvalPredicate() function - RowDescriptor() {} + RowDescriptor() = default; // Returns total size in bytes. // TODO: also take avg string lengths into account, ie, change this diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 482b56d378..6bbaabd1ea 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -130,6 +130,20 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals) TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj); } +RuntimeState::RuntimeState() + : _profile("<unnamed>"), + _obj_pool(new ObjectPool()), + _data_stream_recvrs_pool(new ObjectPool()), + _unreported_error_idx(0), + _is_cancelled(false), + _per_fragment_instance_idx(0) { + _query_options.batch_size = DEFAULT_BATCH_SIZE; + _timezone = TimezoneUtils::default_time_zone; + _timestamp_ms = 0; + TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj); + _exec_env = ExecEnv::GetInstance(); +} + RuntimeState::~RuntimeState() { _block_mgr2.reset(); // close error log file diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index eed32d8b82..ba07ecbf38 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -75,6 +75,9 @@ public: // RuntimeState for executing expr in fe-support. RuntimeState(const TQueryGlobals& query_globals); + // for job task only + RuntimeState(); + // Empty d'tor to avoid issues with unique_ptr. ~RuntimeState(); @@ -105,7 +108,7 @@ public: std::shared_ptr<ObjectPool> obj_pool_ptr() const { return _obj_pool; } const DescriptorTbl& desc_tbl() const { return *_desc_tbl; } - void set_desc_tbl(DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; } + void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; } int batch_size() const { return _query_options.batch_size; } bool abort_on_error() const { return _query_options.abort_on_error; } bool abort_on_default_limit_exceeded() const { @@ -188,7 +191,7 @@ public: int64_t backend_id() const { return _backend_id; } void set_be_number(int be_number) { _be_number = be_number; } - int be_number(void) { return _be_number; } + int be_number(void) const { return _be_number; } // Sets _process_status with err_msg if no error has been set yet. void set_process_status(const std::string& err_msg) { @@ -239,7 +242,7 @@ public: void set_load_job_id(int64_t job_id) { _load_job_id = job_id; } - const int64_t load_job_id() { return _load_job_id; } + const int64_t load_job_id() const { return _load_job_id; } // we only initialize object for load jobs void set_load_error_hub_info(const TLoadErrorHubInfo& hub_info) { @@ -312,17 +315,21 @@ public: ReservationTracker* instance_buffer_reservation() { return _instance_buffer_reservation.get(); } - int64_t min_reservation() { return _query_options.min_reservation; } + int64_t min_reservation() const { return _query_options.min_reservation; } - int64_t max_reservation() { return _query_options.max_reservation; } + int64_t max_reservation() const { return _query_options.max_reservation; } - bool disable_stream_preaggregations() { return _query_options.disable_stream_preaggregations; } + bool disable_stream_preaggregations() const { + return _query_options.disable_stream_preaggregations; + } bool enable_spill() const { return _query_options.enable_spilling; } - int32_t runtime_filter_wait_time_ms() { return _query_options.runtime_filter_wait_time_ms; } + int32_t runtime_filter_wait_time_ms() const { + return _query_options.runtime_filter_wait_time_ms; + } - int32_t runtime_filter_max_in_num() { return _query_options.runtime_filter_max_in_num; } + int32_t runtime_filter_max_in_num() const { return _query_options.runtime_filter_max_in_num; } bool enable_vectorized_exec() const { return _query_options.enable_vectorized_engine; } @@ -387,7 +394,7 @@ private: // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. RuntimeProfile _profile; - DescriptorTbl* _desc_tbl; + const DescriptorTbl* _desc_tbl; std::shared_ptr<ObjectPool> _obj_pool; // runtime filter diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index b476715612..e5b82fc939 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -24,8 +24,7 @@ namespace doris { -typedef void (*ERRCALLBACK)(); - +using ERRCALLBACK = void (*)(); struct ConsumeErrCallBackInfo { std::string cancel_msg; bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit. diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index ea8b952f8c..b0b6231cbb 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -166,7 +166,9 @@ public: /// Appends one element from other column with the same type multiple times. virtual void insert_many_from(const IColumn& src, size_t position, size_t length) { - for (size_t i = 0; i < length; ++i) insert_from(src, position); + for (size_t i = 0; i < length; ++i) { + insert_from(src, position); + } } /// Appends a batch elements from other column with the same type @@ -199,6 +201,12 @@ public: LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name(); } + void insert_many_data(const char* pos, size_t length, size_t data_num) { + for (size_t i = 0; i < data_num; ++i) { + insert_data(pos, length); + } + } + /// Appends "default value". /// Is used when there are need to increase column size, but inserting value doesn't make sense. /// For example, ColumnNullable(Nested) absolutely ignores values of nested column if it is marked as NULL. @@ -206,7 +214,9 @@ public: /// Appends "default value" multiple times. virtual void insert_many_defaults(size_t length) { - for (size_t i = 0; i < length; ++i) insert_default(); + for (size_t i = 0; i < length; ++i) { + insert_default(); + } } virtual void insert_elements(void* elements, size_t num) { diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index aeeb84c679..f929da979d 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -17,12 +17,9 @@ #include "vec/olap/block_reader.h" -#include "olap/row_block.h" -#include "olap/rowset/beta_rowset_reader.h" -#include "olap/schema.h" -#include "olap/storage_engine.h" +#include "common/status.h" +#include "olap/olap_common.h" #include "runtime/mem_pool.h" -#include "runtime/mem_tracker.h" #include "vec/aggregate_functions/aggregate_function_reader.h" #include "vec/olap/vcollect_iterator.h" diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 91ae5f670f..ad2d27d1d8 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -19,13 +19,9 @@ #include <parallel_hashmap/phmap.h> -#include "olap/collect_iterator.h" #include "olap/reader.h" #include "olap/rowset/rowset_reader.h" -#include "olap/tablet.h" #include "vec/aggregate_functions/aggregate_function.h" -#include "vec/aggregate_functions/aggregate_function_reader.h" -#include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/olap/vcollect_iterator.h" namespace doris { @@ -50,9 +46,6 @@ public: } private: - friend class VCollectIterator; - friend class DeleteHandler; - // Directly read row from rowset and pass to upper caller. No need to do aggregation. // This is usually used for DUPLICATE KEY tables Status _direct_next_block(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof); diff --git a/be/test/olap/schema_change_test.cpp b/be/test/olap/schema_change_test.cpp index 7bbf3533ab..019c7635f8 100644 --- a/be/test/olap/schema_change_test.cpp +++ b/be/test/olap/schema_change_test.cpp @@ -996,7 +996,7 @@ TEST_F(TestColumn, ConvertIntToBitmap) { TabletSchema mv_tablet_schema; mv_tablet_schema.init_from_pb(mv_tablet_schema_pb); - RowBlockChanger row_block_changer(mv_tablet_schema); + RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl()); ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0); column_mapping->ref_column = 0; column_mapping = row_block_changer.get_mutable_column_mapping(1); @@ -1079,7 +1079,7 @@ TEST_F(TestColumn, ConvertCharToHLL) { TabletSchema mv_tablet_schema; mv_tablet_schema.init_from_pb(mv_tablet_schema_pb); - RowBlockChanger row_block_changer(mv_tablet_schema); + RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl()); ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0); column_mapping->ref_column = 0; column_mapping = row_block_changer.get_mutable_column_mapping(1); @@ -1160,7 +1160,7 @@ TEST_F(TestColumn, ConvertCharToCount) { TabletSchema mv_tablet_schema; mv_tablet_schema.init_from_pb(mv_tablet_schema_pb); - RowBlockChanger row_block_changer(mv_tablet_schema); + RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl()); ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0); column_mapping->ref_column = 0; column_mapping = row_block_changer.get_mutable_column_mapping(1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 9a66f10772..a05863d9ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -19,10 +19,13 @@ package org.apache.doris.alter; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.MVColumnItem; +import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -363,14 +366,22 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } } + List<Column> fullSchema = tbl.getBaseSchema(true); + DescriptorTable descTable = new DescriptorTable(); + for (Column column : fullSchema) { + TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); + SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc); + destSlotDesc.setIsMaterialized(true); + destSlotDesc.setColumn(column); + destSlotDesc.setIsNullable(column.isAllowNull()); + } + List<Replica> rollupReplicas = rollupTablet.getReplicas(); for (Replica rollupReplica : rollupReplicas) { - AlterReplicaTask rollupTask = new AlterReplicaTask( - rollupReplica.getBackendId(), dbId, tableId, partitionId, - rollupIndexId, baseIndexId, - rollupTabletId, baseTabletId, rollupReplica.getId(), - rollupSchemaHash, baseSchemaHash, - visibleVersion, jobId, JobType.ROLLUP, defineExprs); + AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId, + partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, + rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, + JobType.ROLLUP, defineExprs, descTable); rollupBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 411ab275a5..4f38cdafda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -17,6 +17,11 @@ package org.apache.doris.alter; +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -375,13 +380,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.readLock(); try { + Map<String, Column> indexColumnMap = Maps.newHashMap(); + for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) { + for (Column column : entry.getValue()) { + indexColumnMap.put(column.getName(), column); + } + } + Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); Preconditions.checkNotNull(partition, partitionId); - // the schema change task will transform the data before visible version(included). + // the schema change task will transform the data before visible + // version(included). long visibleVersion = partition.getVisibleVersion(); Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId); @@ -389,6 +402,32 @@ public class SchemaChangeJobV2 extends AlterJobV2 { long shadowIdxId = entry.getKey(); MaterializedIndex shadowIdx = entry.getValue(); + Map<String, Expr> defineExprs = Maps.newHashMap(); + + List<Column> fullSchema = tbl.getBaseSchema(true); + DescriptorTable descTable = new DescriptorTable(); + for (Column column : fullSchema) { + TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); + SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc); + destSlotDesc.setIsMaterialized(true); + destSlotDesc.setColumn(column); + destSlotDesc.setIsNullable(column.isAllowNull()); + + if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName())) { + Column newColumn = indexColumnMap + .get(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName()); + if (newColumn.getType() != column.getType()) { + try { + defineExprs.put(column.getName(), + new SlotRef(destSlotDesc).castTo(newColumn.getType())); + } catch (AnalysisException e) { + throw new AlterCancelException(e.getMessage()); + } + } + } + + } + long originIdxId = indexIdMap.get(shadowIdxId); int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId)); @@ -398,12 +437,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 { long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId); List<Replica> shadowReplicas = shadowTablet.getReplicas(); for (Replica shadowReplica : shadowReplicas) { - AlterReplicaTask rollupTask = new AlterReplicaTask( - shadowReplica.getBackendId(), dbId, tableId, partitionId, - shadowIdxId, originIdxId, - shadowTabletId, originTabletId, shadowReplica.getId(), - shadowSchemaHash, originSchemaHash, - visibleVersion, jobId, JobType.SCHEMA_CHANGE); + AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, + tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, + shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, + JobType.SCHEMA_CHANGE, defineExprs, descTable); schemaChangeBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index b965ec4b38..e94e3ef290 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -800,11 +800,8 @@ public class Analyzer { } result = globalState.descTbl.addSlotDescriptor(d); result.setColumn(col); - if (col.isAllowNull() || isOuterJoined(d.getId())) { - result.setIsNullable(true); - } else { - result.setIsNullable(false); - } + result.setIsNullable(col.isAllowNull() || isOuterJoined(d.getId())); + slotRefMap.put(key, result); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 7dd92af01c..579d9d1512 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -352,11 +352,7 @@ public class InsertStmt extends DdlStmt { slotDesc.setIsMaterialized(true); slotDesc.setType(col.getType()); slotDesc.setColumn(col); - if (col.isAllowNull()) { - slotDesc.setIsNullable(true); - } else { - slotDesc.setIsNullable(false); - } + slotDesc.setIsNullable(col.isAllowNull()); } // will use it during create load job indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 240edf92fc..b91c839441 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -112,11 +112,7 @@ public class LoadingTaskPlanner { SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc); slotDesc.setIsMaterialized(true); slotDesc.setColumn(col); - if (col.isAllowNull()) { - slotDesc.setIsNullable(true); - } else { - slotDesc.setIsNullable(false); - } + slotDesc.setIsNullable(col.isAllowNull()); } // Generate plan trees diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 37bae6bd07..cbc8f64f1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -881,11 +881,7 @@ public class SparkLoadJob extends BulkLoadJob { SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc); destSlotDesc.setIsMaterialized(true); destSlotDesc.setColumn(column); - if (column.isAllowNull()) { - destSlotDesc.setIsNullable(true); - } else { - destSlotDesc.setIsNullable(false); - } + destSlotDesc.setIsNullable(column.isAllowNull()); } initTBrokerScanRange(descTable, destTupleDesc, columns, brokerDesc); initTDescriptorTable(descTable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java index 501f5bcc76..887e2b33e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java @@ -113,11 +113,7 @@ public class UpdatePlanner extends Planner { slotDesc.setIsMaterialized(true); slotDesc.setType(col.getType()); slotDesc.setColumn(col); - if (col.isAllowNull()) { - slotDesc.setIsNullable(true); - } else { - slotDesc.setIsNullable(false); - } + slotDesc.setIsNullable(col.isAllowNull()); } targetTupleDesc.computeStatAndMemLayout(); return targetTupleDesc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index f25b01d2f1..4235ce0ded 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -18,6 +18,7 @@ package org.apache.doris.task; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotRef; import org.apache.doris.thrift.TAlterMaterializedViewParam; @@ -46,21 +47,16 @@ public class AlterReplicaTask extends AgentTask { private AlterJobV2.JobType jobType; private Map<String, Expr> defineExprs; - - public AlterReplicaTask(long backendId, long dbId, long tableId, - long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, - long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, - long version, long jobId, AlterJobV2.JobType jobType) { - this(backendId, dbId, tableId, partitionId, - rollupIndexId, baseIndexId, rollupTabletId, - baseTabletId, newReplicaId, newSchemaHash, baseSchemaHash, - version, jobId, jobType, null); - } - - public AlterReplicaTask(long backendId, long dbId, long tableId, - long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, - long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, - long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs) { + private DescriptorTable descTable; + + /** + * AlterReplicaTask constructor. + * + */ + public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, + long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, + int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs, + DescriptorTable descTable) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); this.baseTabletId = baseTabletId; @@ -74,6 +70,7 @@ public class AlterReplicaTask extends AgentTask { this.jobType = jobType; this.defineExprs = defineExprs; + this.descTable = descTable; } public long getBaseTabletId() { @@ -117,6 +114,7 @@ public class AlterReplicaTask extends AgentTask { req.addToMaterializedViewParams(mvParam); } } + req.setDescTbl(descTable.toThrift()); return req; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java index 44e6e2ec62..7ffc14dd06 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java @@ -182,11 +182,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } TStreamLoadPutRequest request = getBaseRequest(); @@ -230,11 +226,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } TStreamLoadPutRequest request = getBaseRequest(); @@ -259,11 +251,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } TStreamLoadPutRequest request = getBaseRequest(); @@ -288,11 +276,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -332,11 +316,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -379,11 +359,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -434,11 +410,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } TStreamLoadPutRequest request = getBaseRequest(); @@ -464,11 +436,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } TStreamLoadPutRequest request = getBaseRequest(); @@ -494,11 +462,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -548,11 +512,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -594,11 +554,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -646,11 +602,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -701,11 +653,7 @@ public class StreamLoadScanNodeTest { SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -757,11 +705,7 @@ public class StreamLoadScanNodeTest { System.out.println(column); slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { @@ -823,11 +767,7 @@ public class StreamLoadScanNodeTest { slot.setColumn(column); slot.setIsMaterialized(true); - if (column.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } + slot.setIsNullable(column.isAllowNull()); } new Expectations() { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index e695d7a737..740db1fd08 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -151,6 +151,7 @@ struct TAlterTabletReqV2 { 6: optional Types.TVersionHash alter_version_hash // Deprecated 7: optional list<TAlterMaterializedViewParam> materialized_view_params 8: optional TAlterTabletType alter_tablet_type = TAlterTabletType.SCHEMA_CHANGE + 9: optional Descriptors.TDescriptorTable desc_tbl } struct TAlterMaterializedViewParam { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org