This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new d7893f0 [Bug]Fix some schema change not work right (#4009) d7893f0 is described below commit d7893f0fa7c9f68a50f438080891dacff0ad1408 Author: WingC <1018957...@qq.com> AuthorDate: Fri Jul 10 21:18:29 2020 -0500 [Bug]Fix some schema change not work right (#4009) [Bug]Fix some schema change not work right This CL mainly fix some schema change to varchar type not work right because forget to logic check && Add ConvertTypeResolver to add supported convert type in order to avoid forget logic check --- be/src/olap/schema_change.cpp | 708 +++++++++++---------- be/src/olap/schema_change.h | 130 ++-- .../sql-statements/Data Definition/ALTER TABLE.md | 1 - .../sql-statements/Data Definition/ALTER TABLE.md | 1 - .../main/java/org/apache/doris/catalog/Column.java | 6 + .../java/org/apache/doris/catalog/ColumnType.java | 9 +- 6 files changed, 419 insertions(+), 436 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 54b9794..4458081 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -23,21 +23,21 @@ #include <algorithm> #include <vector> +#include "agent/cgroups_mgr.h" +#include "common/resource_tls.h" #include "olap/merger.h" -#include "olap/storage_engine.h" -#include "olap/tablet.h" +#include "olap/row.h" #include "olap/row_block.h" #include "olap/row_cursor.h" -#include "olap/wrapper_field.h" -#include "olap/row.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_id_generator.h" -#include "runtime/mem_pool.h" -#include "runtime/mem_tracker.h" -#include "common/resource_tls.h" -#include "agent/cgroups_mgr.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/wrapper_field.h" #include "runtime/exec_env.h" #include "runtime/heartbeat_flags.h" +#include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" using std::deque; using std::list; @@ -70,10 +70,8 @@ public: explicit RowBlockMerger(TabletSharedPtr tablet); virtual ~RowBlockMerger(); - bool merge( - const std::vector<RowBlock*>& row_block_arr, - RowsetWriter* rowset_writer, - uint64_t* merged_rows); + bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer, + uint64_t* merged_rows); private: struct MergeElement { @@ -93,9 +91,8 @@ private: std::priority_queue<MergeElement> _heap; }; - RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr &base_tablet) { + const TabletSharedPtr& base_tablet) { _schema_mapping.resize(tablet_schema.num_columns()); } @@ -124,81 +121,141 @@ ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index) return &(_schema_mapping[column_index]); } -#define TYPE_REINTERPRET_CAST(FromType, ToType) \ -{ \ - size_t row_num = ref_block->row_block_info().row_num; \ - for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \ - if (is_data_left_vec[row] != 0) { \ - char* ref_ptr = ref_block->field_ptr(row, ref_column); \ - char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \ - *new_ptr = *ref_ptr; \ - *(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \ - } \ - } \ - break; \ -} +#define TYPE_REINTERPRET_CAST(FromType, ToType) \ + { \ + size_t row_num = ref_block->row_block_info().row_num; \ + for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \ + if (is_data_left_vec[row] != 0) { \ + char* ref_ptr = ref_block->field_ptr(row, ref_column); \ + char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \ + *new_ptr = *ref_ptr; \ + *(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \ + } \ + } \ + break; \ + } + +#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \ + { \ + size_t row_num = ref_block->row_block_info().row_num; \ + for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \ + if (is_data_left_vec[row] != 0) { \ + char* ref_ptr = ref_block->field_ptr(row, ref_column); \ + char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \ + *new_ptr = *ref_ptr; \ + ToType new_value = *(FromType*)(ref_ptr + 1); \ + memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \ + } \ + } \ + break; \ + } + +#define CONVERT_FROM_TYPE(from_type) \ + { \ + switch (mutable_block->tablet_schema().column(i).type()) { \ + case OLAP_FIELD_TYPE_TINYINT: \ + TYPE_REINTERPRET_CAST(from_type, int8_t); \ + case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: \ + TYPE_REINTERPRET_CAST(from_type, uint8_t); \ + case OLAP_FIELD_TYPE_SMALLINT: \ + TYPE_REINTERPRET_CAST(from_type, int16_t); \ + case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: \ + TYPE_REINTERPRET_CAST(from_type, uint16_t); \ + case OLAP_FIELD_TYPE_INT: \ + TYPE_REINTERPRET_CAST(from_type, int32_t); \ + case OLAP_FIELD_TYPE_UNSIGNED_INT: \ + TYPE_REINTERPRET_CAST(from_type, uint32_t); \ + case OLAP_FIELD_TYPE_BIGINT: \ + TYPE_REINTERPRET_CAST(from_type, int64_t); \ + case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: \ + TYPE_REINTERPRET_CAST(from_type, uint64_t); \ + case OLAP_FIELD_TYPE_LARGEINT: \ + LARGEINT_REINTERPRET_CAST(from_type, int128_t); \ + case OLAP_FIELD_TYPE_DOUBLE: \ + TYPE_REINTERPRET_CAST(from_type, double); \ + default: \ + LOG(WARNING) << "the column type which was altered to was unsupported." \ + << " origin_type=" \ + << ref_block->tablet_schema().column(ref_column).type() \ + << ", alter_type=" << mutable_block->tablet_schema().column(i).type(); \ + return false; \ + } \ + break; \ + } + +#define ASSIGN_DEFAULT_VALUE(length) \ + case length: { \ + for (size_t row = 0; row < ref_block.row_block_info().row_num; ++row) { \ + memcpy(buf, _schema_mapping[i].default_value->ptr(), length); \ + buf += length; \ + } \ + break; \ + } -#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \ -{ \ - size_t row_num = ref_block->row_block_info().row_num; \ - for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \ - if (is_data_left_vec[row] != 0) { \ - char* ref_ptr = ref_block->field_ptr(row, ref_column); \ - char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \ - *new_ptr = *ref_ptr; \ - ToType new_value = *(FromType*)(ref_ptr + 1); \ - memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \ - } \ - } \ - break; \ -} +struct ConvertTypeMapHash { + size_t operator()(const std::pair<FieldType, FieldType>& pair) const { + return (pair.first + 31) ^ pair.second; + } +}; -#define CONVERT_FROM_TYPE(from_type) \ -{ \ - switch (mutable_block->tablet_schema().column(i).type()) {\ - case OLAP_FIELD_TYPE_TINYINT: \ - TYPE_REINTERPRET_CAST(from_type, int8_t); \ - case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: \ - TYPE_REINTERPRET_CAST(from_type, uint8_t); \ - case OLAP_FIELD_TYPE_SMALLINT: \ - TYPE_REINTERPRET_CAST(from_type, int16_t); \ - case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: \ - TYPE_REINTERPRET_CAST(from_type, uint16_t); \ - case OLAP_FIELD_TYPE_INT: \ - TYPE_REINTERPRET_CAST(from_type, int32_t); \ - case OLAP_FIELD_TYPE_UNSIGNED_INT: \ - TYPE_REINTERPRET_CAST(from_type, uint32_t); \ - case OLAP_FIELD_TYPE_BIGINT: \ - TYPE_REINTERPRET_CAST(from_type, int64_t); \ - case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: \ - TYPE_REINTERPRET_CAST(from_type, uint64_t); \ - case OLAP_FIELD_TYPE_LARGEINT: \ - LARGEINT_REINTERPRET_CAST(from_type, int128_t); \ - case OLAP_FIELD_TYPE_DOUBLE: \ - TYPE_REINTERPRET_CAST(from_type, double); \ - default: \ - LOG(WARNING) << "the column type which was altered to was unsupported." \ - << " origin_type=" << ref_block->tablet_schema().column(ref_column).type() \ - << ", alter_type=" << mutable_block->tablet_schema().column(i).type(); \ - return false; \ - } \ - break; \ -} +class ConvertTypeResolver { + DECLARE_SINGLETON(ConvertTypeResolver); -#define ASSIGN_DEFAULT_VALUE(length) \ - case length: { \ - for (size_t row = 0; row < ref_block.row_block_info().row_num; ++row) { \ - memcpy(buf, _schema_mapping[i].default_value->ptr(), length); \ - buf += length; \ - } \ - break; \ +public: + bool get_convert_type_info(const FieldType from_type, const FieldType to_type) const { + return _convert_type_set.find(std::make_pair(from_type, to_type)) != + _convert_type_set.end(); + } + + template <FieldType from_type, FieldType to_type> + void add_convert_type_mapping() { + _convert_type_set.emplace(std::make_pair(from_type, to_type)); } -bool RowBlockChanger::change_row_block( - const RowBlock* ref_block, - int32_t data_version, - RowBlock* mutable_block, - uint64_t* filtered_rows) const { +private: + typedef std::pair<FieldType, FieldType> convert_type_pair; + std::unordered_set<convert_type_pair, ConvertTypeMapHash> _convert_type_set; + + DISALLOW_COPY_AND_ASSIGN(ConvertTypeResolver); +}; + +ConvertTypeResolver::ConvertTypeResolver() { + // supported type convert should annotate in doc: + // http://doris.incubator.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Definition/ALTER%20TABLE.html#description + // If type convert is supported here, you should check fe/src/main/java/org/apache/doris/catalog/ColumnType.java to supported it either + // from varchar type + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_TINYINT>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_SMALLINT>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_INT>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_BIGINT>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_LARGEINT>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_FLOAT>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_DOUBLE>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_DATE>(); + + // to varchar type + add_convert_type_mapping<OLAP_FIELD_TYPE_TINYINT, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_SMALLINT, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_BIGINT, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_LARGEINT, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_DOUBLE, OLAP_FIELD_TYPE_VARCHAR>(); + add_convert_type_mapping<OLAP_FIELD_TYPE_DECIMAL, OLAP_FIELD_TYPE_VARCHAR>(); + + add_convert_type_mapping<OLAP_FIELD_TYPE_DATE, OLAP_FIELD_TYPE_DATETIME>(); + + add_convert_type_mapping<OLAP_FIELD_TYPE_DATETIME, OLAP_FIELD_TYPE_DATE>(); + + add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_DOUBLE>(); + + add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_DATE>(); +} + +ConvertTypeResolver::~ConvertTypeResolver() {} + +bool RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data_version, + RowBlock* mutable_block, uint64_t* filtered_rows) const { if (mutable_block == nullptr) { LOG(FATAL) << "mutable block is uninitialized."; return false; @@ -262,7 +319,8 @@ bool RowBlockChanger::change_row_block( MemPool* mem_pool = mutable_block->mem_pool(); // b. 根据前面的过滤信息,只对还标记为1的处理 - for (size_t i = 0, len = mutable_block->tablet_schema().num_columns(); !filter_all && i < len; ++i) { + for (size_t i = 0, len = mutable_block->tablet_schema().num_columns(); !filter_all && i < len; + ++i) { int32_t ref_column = _schema_mapping[i].ref_column; if (_schema_mapping[i].ref_column >= 0) { @@ -273,7 +331,7 @@ bool RowBlockChanger::change_row_block( if (newtype == reftype) { // 效率低下,也可以直接计算变长域拷贝,但仍然会破坏封装 for (size_t row_index = 0, new_row_index = 0; - row_index < ref_block->row_block_info().row_num; ++row_index) { + row_index < ref_block->row_block_info().row_num; ++row_index) { // 不需要的row,每次处理到这个row时就跳过 if (need_filter_data && is_data_left_vec[row_index] == 0) { continue; @@ -286,7 +344,6 @@ bool RowBlockChanger::change_row_block( if (true == read_helper.is_null(ref_column)) { write_helper.set_null(i); } else { - write_helper.set_not_null(i); if (newtype == OLAP_FIELD_TYPE_CHAR) { // if modify length of CHAR type, the size of slice should be equal @@ -298,7 +355,8 @@ bool RowBlockChanger::change_row_block( size_t copy_size = (size < src->size) ? size : src->size; memcpy(buf, src->data, copy_size); Slice dst(buf, size); - write_helper.set_field_content(i, reinterpret_cast<char*>(&dst), mem_pool); + write_helper.set_field_content(i, reinterpret_cast<char*>(&dst), + mem_pool); } else { char* src = read_helper.cell_ptr(ref_column); write_helper.set_field_content(i, src, mem_pool); @@ -310,7 +368,7 @@ bool RowBlockChanger::change_row_block( } else if (newtype == OLAP_FIELD_TYPE_VARCHAR && reftype == OLAP_FIELD_TYPE_CHAR) { // 效率低下,也可以直接计算变长域拷贝,但仍然会破坏封装 for (size_t row_index = 0, new_row_index = 0; - row_index < ref_block->row_block_info().row_num; ++row_index) { + row_index < ref_block->row_block_info().row_num; ++row_index) { // 不需要的row,每次处理到这个row时就跳过 if (need_filter_data && is_data_left_vec[row_index] == 0) { continue; @@ -337,20 +395,9 @@ bool RowBlockChanger::change_row_block( write_helper.set_field_content(i, reinterpret_cast<char*>(slice), mem_pool); } } - } else if ((reftype == OLAP_FIELD_TYPE_FLOAT && newtype == OLAP_FIELD_TYPE_DOUBLE) - || (reftype == OLAP_FIELD_TYPE_INT && newtype == OLAP_FIELD_TYPE_DATE) - || (reftype == OLAP_FIELD_TYPE_DATE && newtype == OLAP_FIELD_TYPE_DATETIME) - || (reftype == OLAP_FIELD_TYPE_DATETIME && newtype == OLAP_FIELD_TYPE_DATE) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_DATE) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_FLOAT) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_DOUBLE) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_TINYINT) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_SMALLINT) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_INT) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_BIGINT) - || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_LARGEINT)) { + } else if (ConvertTypeResolver::instance()->get_convert_type_info(reftype, newtype)) { for (size_t row_index = 0, new_row_index = 0; - row_index < ref_block->row_block_info().row_num; ++row_index) { + row_index < ref_block->row_block_info().row_num; ++row_index) { // Skip filtered rows if (need_filter_data && is_data_left_vec[row_index] == 0) { continue; @@ -363,10 +410,13 @@ bool RowBlockChanger::change_row_block( write_helper.set_not_null(i); const Field* ref_field = read_helper.column_schema(ref_column); char* ref_value = read_helper.cell_ptr(ref_column); - OLAPStatus st = write_helper.convert_from(i, ref_value, ref_field->type_info(), mem_pool); + OLAPStatus st = write_helper.convert_from(i, ref_value, + ref_field->type_info(), mem_pool); if (st != OLAPStatus::OLAP_SUCCESS) { - LOG(WARNING) << "the column type which was altered from was unsupported." - << "status:" << st << ", from_type=" << reftype << ", to_type=" << newtype; + LOG(WARNING) + << "the column type which was altered from was unsupported." + << "status:" << st << ", from_type=" << reftype + << ", to_type=" << newtype; return false; } } @@ -394,21 +444,23 @@ bool RowBlockChanger::change_row_block( CONVERT_FROM_TYPE(uint64_t); default: LOG(WARNING) << "the column type which was altered from was unsupported." - << " from_type=" << ref_block->tablet_schema().column(ref_column).type(); + << " from_type=" + << ref_block->tablet_schema().column(ref_column).type(); return false; } if (newtype < reftype) { VLOG(3) << "type degraded while altering column. " << "column=" << mutable_block->tablet_schema().column(i).name() - << ", origin_type=" << ref_block->tablet_schema().column(ref_column).type() + << ", origin_type=" + << ref_block->tablet_schema().column(ref_column).type() << ", alter_type=" << mutable_block->tablet_schema().column(i).type(); } } } else { // 新增列,写入默认值 for (size_t row_index = 0, new_row_index = 0; - row_index < ref_block->row_block_info().row_num; ++row_index) { + row_index < ref_block->row_block_info().row_num; ++row_index) { // 不需要的row,每次处理到这个row时就跳过 if (need_filter_data && is_data_left_vec[row_index] == 0) { continue; @@ -420,8 +472,8 @@ bool RowBlockChanger::change_row_block( write_helper.set_null(i); } else { write_helper.set_not_null(i); - write_helper.set_field_content( - i, _schema_mapping[i].default_value->ptr(), mem_pool); + write_helper.set_field_content(i, _schema_mapping[i].default_value->ptr(), + mem_pool); } } } @@ -438,9 +490,8 @@ bool RowBlockChanger::change_row_block( #undef TYPE_REINTERPRET_CAST #undef ASSIGN_DEFAULT_VALUE -RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator) : - _row_block_allocator(row_block_allocator), - _swap_row_block(nullptr) {} +RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator) + : _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {} RowBlockSorter::~RowBlockSorter() { if (_swap_row_block) { @@ -459,8 +510,9 @@ bool RowBlockSorter::sort(RowBlock** row_block) { _swap_row_block = nullptr; } - if (_row_block_allocator->allocate(&_swap_row_block, row_num, null_supported) != OLAP_SUCCESS - || _swap_row_block == nullptr) { + if (_row_block_allocator->allocate(&_swap_row_block, row_num, null_supported) != + OLAP_SUCCESS || + _swap_row_block == nullptr) { LOG(WARNING) << "fail to allocate memory."; return false; } @@ -478,7 +530,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) { // create an list of row cursor as long as the number of rows in data block. for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) { - if ((row_cursor_list[i] = new(nothrow) RowCursor()) == nullptr) { + if ((row_cursor_list[i] = new (nothrow) RowCursor()) == nullptr) { LOG(WARNING) << "failed to malloc RowCursor. size=" << sizeof(RowCursor); goto SORT_ERR_EXIT; } @@ -521,11 +573,10 @@ SORT_ERR_EXIT: return false; } -RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, - size_t memory_limitation) : - _tablet_schema(tablet_schema), - _memory_allocated(0), - _memory_limitation(memory_limitation) { +RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation) + : _tablet_schema(tablet_schema), + _memory_allocated(0), + _memory_limitation(memory_limitation) { _row_len = 0; _row_len = tablet_schema.row_size(); @@ -538,13 +589,10 @@ RowBlockAllocator::~RowBlockAllocator() { } } -OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, - size_t num_rows, - bool null_supported) { +OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool null_supported) { size_t row_block_size = _row_len * num_rows; - if (_memory_limitation > 0 - && _memory_allocated + row_block_size > _memory_limitation) { + if (_memory_limitation > 0 && _memory_allocated + row_block_size > _memory_limitation) { VLOG(3) << "RowBlockAllocator::alocate() memory exceeded. " << "m_memory_allocated=" << _memory_allocated; *row_block = nullptr; @@ -552,7 +600,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, } // TODO(lijiao) : 为什么舍弃原有的m_row_block_buffer - *row_block = new(nothrow) RowBlock(&_tablet_schema); + *row_block = new (nothrow) RowBlock(&_tablet_schema); if (*row_block == nullptr) { LOG(WARNING) << "failed to malloc RowBlock. size=" << sizeof(RowBlock); @@ -570,10 +618,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, } _memory_allocated += row_block_size; - VLOG(3) << "RowBlockAllocator::allocate() this=" << this - << ", num_rows=" << num_rows - << ", m_memory_allocated=" << _memory_allocated - << ", row_block_addr=" << *row_block; + VLOG(3) << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows + << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << *row_block; return res; } @@ -587,8 +633,7 @@ void RowBlockAllocator::release(RowBlock* row_block) { VLOG(3) << "RowBlockAllocator::release() this=" << this << ", num_rows=" << row_block->capacity() - << ", m_memory_allocated=" << _memory_allocated - << ", row_block_addr=" << row_block; + << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << row_block; delete row_block; } @@ -596,10 +641,8 @@ RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {} RowBlockMerger::~RowBlockMerger() {} -bool RowBlockMerger::merge( - const vector<RowBlock*>& row_block_arr, - RowsetWriter* rowset_writer, - uint64_t* merged_rows) { +bool RowBlockMerger::merge(const vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer, + uint64_t* merged_rows) { uint64_t tmp_merged_rows = 0; RowCursor row_cursor; std::unique_ptr<MemTracker> tracker(new MemTracker(-1)); @@ -614,7 +657,8 @@ bool RowBlockMerger::merge( row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); while (_heap.size() > 0) { - init_row_with_others(&row_cursor, *(_heap.top().row_cursor), mem_pool.get(), agg_object_pool.get()); + init_row_with_others(&row_cursor, *(_heap.top().row_cursor), mem_pool.get(), + agg_object_pool.get()); if (!_pop_heap()) { goto MERGE_ERR; @@ -671,7 +715,7 @@ bool RowBlockMerger::_make_heap(const vector<RowBlock*>& row_block_arr) { MergeElement element; element.row_block = row_block; element.row_block_index = 0; - element.row_cursor = new(nothrow) RowCursor(); + element.row_cursor = new (nothrow) RowCursor(); if (element.row_cursor == nullptr) { LOG(FATAL) << "failed to malloc RowCursor. size=" << sizeof(RowCursor); @@ -707,30 +751,25 @@ bool RowBlockMerger::_pop_heap() { return true; } -bool LinkedSchemaChange::process( - RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) { +bool LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet) { OLAPStatus status = new_rowset_writer->add_rowset_for_linked_schema_change( - rowset_reader->rowset(), _row_block_changer.get_schema_mapping()); + rowset_reader->rowset(), _row_block_changer.get_schema_mapping()); if (status != OLAP_SUCCESS) { LOG(WARNING) << "fail to convert rowset." << ", new_tablet=" << new_tablet->full_name() << ", base_tablet=" << base_tablet->full_name() - << ", version=" << new_rowset_writer->version().first - << "-" << new_rowset_writer->version().second; + << ", version=" << new_rowset_writer->version().first << "-" + << new_rowset_writer->version().second; return false; } return true; } -SchemaChangeDirectly::SchemaChangeDirectly( - const RowBlockChanger& row_block_changer) : - _row_block_changer(row_block_changer), - _row_block_allocator(nullptr), - _cursor(nullptr) { } +SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer) + : _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {} SchemaChangeDirectly::~SchemaChangeDirectly() { VLOG(3) << "~SchemaChangeDirectly()"; @@ -751,8 +790,7 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBloc } bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) { + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) { if (_row_block_allocator == nullptr) { _row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0); if (_row_block_allocator == nullptr) { @@ -762,7 +800,7 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr } if (nullptr == _cursor) { - _cursor = new(nothrow) RowCursor(); + _cursor = new (nothrow) RowCursor(); if (nullptr == _cursor) { LOG(WARNING) << "fail to allocate row cursor."; return false; @@ -790,7 +828,8 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr res = rowset_writer->flush(); if (res != OLAP_SUCCESS) { LOG(WARNING) << "create empty version for schema change failed." - << "version=" << rowset_writer->version().first << "-" << rowset_writer->version().second; + << "version=" << rowset_writer->version().first << "-" + << rowset_writer->version().second; return false; } return true; @@ -809,17 +848,16 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr rowset_reader->next_block(&ref_row_block); while (ref_row_block != nullptr && ref_row_block->has_remaining()) { // 注意这里强制分配和旧块等大的块(小了可能会存不下) - if (new_row_block == nullptr - || new_row_block->capacity() < ref_row_block->row_block_info().row_num) { + if (new_row_block == nullptr || + new_row_block->capacity() < ref_row_block->row_block_info().row_num) { if (new_row_block != nullptr) { _row_block_allocator->release(new_row_block); new_row_block = nullptr; } - if (OLAP_SUCCESS != _row_block_allocator->allocate( - &new_row_block, - ref_row_block->row_block_info().row_num, - true)) { + if (OLAP_SUCCESS != + _row_block_allocator->allocate(&new_row_block, + ref_row_block->row_block_info().row_num, true)) { LOG(WARNING) << "failed to allocate RowBlock."; result = false; goto DIRECTLY_PROCESS_ERR; @@ -830,10 +868,8 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr // 将ref改为new。这一步按道理来说确实需要等大的块,但理论上和writer无关。 uint64_t filtered_rows = 0; - if (!_row_block_changer.change_row_block(ref_row_block, - rowset_reader->version().second, - new_row_block, - &filtered_rows)) { + if (!_row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second, + new_row_block, &filtered_rows)) { LOG(WARNING) << "failed to change data in row block."; result = false; goto DIRECTLY_PROCESS_ERR; @@ -861,23 +897,21 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr // Check row num changes if (config::row_nums_check) { - if (rowset_reader->rowset()->num_rows() - != rowset_writer->num_rows() + merged_rows() + filtered_rows()) { + if (rowset_reader->rowset()->num_rows() != + rowset_writer->num_rows() + merged_rows() + filtered_rows()) { LOG(WARNING) << "fail to check row num! " - << "source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() - << ", new_index_rows=" << rowset_writer->num_rows(); + << "source_rows=" << rowset_reader->rowset()->num_rows() + << ", merged_rows=" << merged_rows() + << ", filtered_rows=" << filtered_rows() + << ", new_index_rows=" << rowset_writer->num_rows(); result = false; } LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() + << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << rowset_writer->num_rows(); } else { LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() + << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << rowset_writer->num_rows(); } @@ -890,10 +924,10 @@ DIRECTLY_PROCESS_ERR: } SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, - size_t memory_limitation) : - _row_block_changer(row_block_changer), - _memory_limitation(memory_limitation), - _row_block_allocator(nullptr) { + size_t memory_limitation) + : _row_block_changer(row_block_changer), + _memory_limitation(memory_limitation), + _row_block_allocator(nullptr) { // 每次SchemaChange做外排的时候,会写一些临时版本(比如999,1000,1001),为避免Cache冲突,临时 // 版本进行2个处理: // 1. 随机值作为VersionHash @@ -908,13 +942,12 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() { SAFE_DELETE(_row_block_allocator); } -bool SchemaChangeWithSorting::process( - RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) { +bool SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader, + RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet, + TabletSharedPtr base_tablet) { if (_row_block_allocator == nullptr) { - _row_block_allocator = new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation); + _row_block_allocator = + new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation); if (_row_block_allocator == nullptr) { LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << sizeof(RowBlockAllocator); return false; @@ -932,14 +965,13 @@ bool SchemaChangeWithSorting::process( res = new_rowset_writer->flush(); if (res != OLAP_SUCCESS) { LOG(WARNING) << "create empty version for schema change failed." - << " version=" << new_rowset_writer->version().first - << "-" << new_rowset_writer->version().second; + << " version=" << new_rowset_writer->version().first << "-" + << new_rowset_writer->version().second; return false; } return true; } - bool result = true; RowBlockSorter row_block_sorter(_row_block_allocator); @@ -966,8 +998,9 @@ bool SchemaChangeWithSorting::process( RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); while (ref_row_block != nullptr && ref_row_block->has_remaining()) { - if (OLAP_SUCCESS != _row_block_allocator->allocate( - &new_row_block, ref_row_block->row_block_info().row_num, true)) { + if (OLAP_SUCCESS != _row_block_allocator->allocate(&new_row_block, + ref_row_block->row_block_info().row_num, + true)) { LOG(WARNING) << "failed to allocate RowBlock."; result = false; goto SORTING_PROCESS_ERR; @@ -986,14 +1019,11 @@ bool SchemaChangeWithSorting::process( if (use_beta_rowset) { new_rowset_type = BETA_ROWSET; } - if (!_internal_sorting(row_block_arr, - Version(_temp_delta_versions.second, - _temp_delta_versions.second), - rowset_reader->version_hash(), - new_tablet, - new_rowset_type, - segments_overlap, - &rowset)) { + if (!_internal_sorting( + row_block_arr, + Version(_temp_delta_versions.second, _temp_delta_versions.second), + rowset_reader->version_hash(), new_tablet, new_rowset_type, + segments_overlap, &rowset)) { LOG(WARNING) << "failed to sorting internally."; result = false; goto SORTING_PROCESS_ERR; @@ -1001,8 +1031,8 @@ bool SchemaChangeWithSorting::process( src_rowsets.push_back(rowset); - for (vector<RowBlock*>::iterator it = row_block_arr.begin(); - it != row_block_arr.end(); ++it) { + for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end(); + ++it) { _row_block_allocator->release(*it); } @@ -1014,8 +1044,7 @@ bool SchemaChangeWithSorting::process( } uint64_t filtered_rows = 0; - if (!_row_block_changer.change_row_block(ref_row_block, - rowset_reader->version().second, + if (!_row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second, new_row_block, &filtered_rows)) { LOG(WARNING) << "failed to change data in row block."; result = false; @@ -1051,11 +1080,8 @@ bool SchemaChangeWithSorting::process( } if (!_internal_sorting(row_block_arr, Version(_temp_delta_versions.second, _temp_delta_versions.second), - rowset_reader->version_hash(), - new_tablet, - new_rowset_type, - segments_overlap, - &rowset)) { + rowset_reader->version_hash(), new_tablet, new_rowset_type, + segments_overlap, &rowset)) { LOG(WARNING) << "failed to sorting internally."; result = false; goto SORTING_PROCESS_ERR; @@ -1063,8 +1089,8 @@ bool SchemaChangeWithSorting::process( src_rowsets.push_back(rowset); - for (vector<RowBlock*>::iterator it = row_block_arr.begin(); - it != row_block_arr.end(); ++it) { + for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end(); + ++it) { _row_block_allocator->release(*it); } @@ -1078,8 +1104,8 @@ bool SchemaChangeWithSorting::process( res = new_rowset_writer->flush(); if (res != OLAP_SUCCESS) { LOG(WARNING) << "create empty version for schema change failed." - << " version=" << new_rowset_writer->version().first - << "-" << new_rowset_writer->version().second; + << " version=" << new_rowset_writer->version().first << "-" + << new_rowset_writer->version().second; return false; } } else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) { @@ -1092,8 +1118,8 @@ bool SchemaChangeWithSorting::process( // Check row num changes if (config::row_nums_check) { - if (rowset_reader->rowset()->num_rows() - != new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) { + if (rowset_reader->rowset()->num_rows() != + new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) { LOG(WARNING) << "fail to check row num!" << " source_rows=" << rowset_reader->rowset()->num_rows() << ", merged_rows=" << merged_rows() @@ -1102,26 +1128,23 @@ bool SchemaChangeWithSorting::process( result = false; } LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() + << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << new_rowset_writer->num_rows(); } else { LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows() - << ", merged_rows=" << merged_rows() - << ", filtered_rows=" << filtered_rows() + << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows() << ", new_index_rows=" << new_rowset_writer->num_rows(); } SORTING_PROCESS_ERR: // remove the intermediate rowsets generated by internal sorting - for (vector<RowsetSharedPtr>::iterator it = src_rowsets.begin(); - it != src_rowsets.end(); ++it) { + for (vector<RowsetSharedPtr>::iterator it = src_rowsets.begin(); it != src_rowsets.end(); + ++it) { StorageEngine::instance()->add_unused_rowset(*it); } - for (vector<RowBlock*>::iterator it = row_block_arr.begin(); - it != row_block_arr.end(); ++it) { + for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end(); ++it) { _row_block_allocator->release(*it); } @@ -1130,8 +1153,7 @@ SORTING_PROCESS_ERR: } bool SchemaChangeWithSorting::_internal_sorting(const vector<RowBlock*>& row_block_arr, - const Version& version, - VersionHash version_hash, + const Version& version, VersionHash version_hash, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, @@ -1162,10 +1184,12 @@ bool SchemaChangeWithSorting::_internal_sorting(const vector<RowBlock*>& row_blo if (!merger.merge(row_block_arr, rowset_writer.get(), &merged_rows)) { LOG(WARNING) << "failed to merge row blocks."; - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); return false; } - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); add_merged_rows(merged_rows); *rowset = rowset_writer->build(); return true; @@ -1186,11 +1210,12 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row } Merger::Statistics stats; - auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer, &stats); + auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer, + &stats); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to merge rowsets. tablet=" << new_tablet->full_name() - << ", version=" << rowset_writer->version().first - << "-" << rowset_writer->version().second; + << ", version=" << rowset_writer->version().first << "-" + << rowset_writer->version().second; return false; } add_merged_rows(stats.merged_rows); @@ -1207,7 +1232,8 @@ OLAPStatus SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& << ", alter_version_hash=" << request.alter_version_hash; // Lock schema_change_lock util schema change info is stored in tablet header - if (!StorageEngine::instance()->tablet_manager()->try_schema_change_lock(request.base_tablet_id)) { + if (!StorageEngine::instance()->tablet_manager()->try_schema_change_lock( + request.base_tablet_id)) { LOG(WARNING) << "failed to obtain schema change lock. " << "base_tablet=" << request.base_tablet_id; return OLAP_ERR_TRY_LOCK_FAILED; @@ -1254,7 +1280,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe return res; } - LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet to new tablet" + LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet " + "to new tablet" << " base_tablet=" << base_tablet->full_name() << " new_tablet=" << new_tablet->full_name(); @@ -1298,7 +1325,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe // should check the max_version >= request.alter_version, if not the convert is useless RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version(); if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) { - LOG(WARNING) << "base tablet's max version=" << (max_rowset == nullptr ? 0 : max_rowset->end_version()) + LOG(WARNING) << "base tablet's max version=" + << (max_rowset == nullptr ? 0 : max_rowset->end_version()) << " is less than request version=" << request.alter_version; res = OLAP_ERR_VERSION_NOT_EXIST; break; @@ -1335,7 +1363,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe } } - res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), end_version); + res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), + end_version); if (res != OLAP_SUCCESS) { LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name() << ", end_version=" << end_version; @@ -1395,7 +1424,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe break; } new_tablet->save_meta(); - } while(0); + } while (0); if (res == OLAP_SUCCESS) { // _validate_alter_result should be outside the above while loop. @@ -1413,11 +1442,10 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe return res; } -OLAPStatus SchemaChangeHandler::schema_version_convert( - TabletSharedPtr base_tablet, - TabletSharedPtr new_tablet, - RowsetSharedPtr* base_rowset, - RowsetSharedPtr* new_rowset) { +OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet, + TabletSharedPtr new_tablet, + RowsetSharedPtr* base_rowset, + RowsetSharedPtr* new_rowset) { OLAPStatus res = OLAP_SUCCESS; LOG(INFO) << "begin to convert delta version for schema changing. " << "base_tablet=" << base_tablet->full_name() @@ -1429,11 +1457,8 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( bool sc_sorting = false; bool sc_directly = false; - if (OLAP_SUCCESS != (res = _parse_request(base_tablet, - new_tablet, - &rb_changer, - &sc_sorting, - &sc_directly))) { + if (OLAP_SUCCESS != + (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly))) { LOG(WARNING) << "failed to parse the request. res=" << res; return res; } @@ -1444,16 +1469,16 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( SchemaChange* sc_procedure = nullptr; if (sc_sorting) { size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change; - LOG(INFO) << "doing schema change with sorting for base_tablet " << base_tablet->full_name(); - sc_procedure = new(nothrow) SchemaChangeWithSorting( - rb_changer, - memory_limitation * 1024 * 1024 * 1024); + LOG(INFO) << "doing schema change with sorting for base_tablet " + << base_tablet->full_name(); + sc_procedure = new (nothrow) + SchemaChangeWithSorting(rb_changer, memory_limitation * 1024 * 1024 * 1024); } else if (sc_directly) { LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name(); - sc_procedure = new(nothrow) SchemaChangeDirectly(rb_changer); + sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer); } else { LOG(INFO) << "doing linked schema change for base_tablet " << base_tablet->full_name(); - sc_procedure = new(nothrow) LinkedSchemaChange(rb_changer); + sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer); } if (sc_procedure == nullptr) { @@ -1505,18 +1530,20 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( if ((*base_rowset)->is_pending()) { LOG(WARNING) << "failed to process the transaction when schema change. " << "tablet=" << new_tablet->full_name() << "'" - << ", transaction="<< (*base_rowset)->txn_id(); + << ", transaction=" << (*base_rowset)->txn_id(); } else { LOG(WARNING) << "failed to process the version. " - << "version=" << (*base_rowset)->version().first - << "-" << (*base_rowset)->version().second; + << "version=" << (*base_rowset)->version().first << "-" + << (*base_rowset)->version().second; } res = OLAP_ERR_INPUT_PARAMETER_ERROR; - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); goto SCHEMA_VERSION_CONVERT_ERR; } *new_rowset = rowset_writer->build(); - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); if (*new_rowset == nullptr) { LOG(WARNING) << "build rowset failed."; res = OLAP_ERR_MALLOC_ERROR; @@ -1536,15 +1563,13 @@ SCHEMA_VERSION_CONVERT_ERR: SAFE_DELETE(sc_procedure); LOG(WARNING) << "failed to convert rowsets. " - << " base_tablet=" << base_tablet->full_name() - << ", new_tablet=" << new_tablet->full_name() - << " res = " << res; + << " base_tablet=" << base_tablet->full_name() + << ", new_tablet=" << new_tablet->full_name() << " res = " << res; return res; } OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed( - TabletSharedPtr base_tablet, - vector<Version>* versions_to_be_changed) { + TabletSharedPtr base_tablet, vector<Version>* versions_to_be_changed) { RowsetSharedPtr rowset = base_tablet->rowset_with_max_version(); if (rowset == nullptr) { LOG(WARNING) << "Tablet has no version. base_tablet=" << base_tablet->full_name(); @@ -1552,7 +1577,8 @@ OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed( } vector<Version> span_versions; - RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second), &span_versions)); + RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second), + &span_versions)); for (uint32_t i = 0; i < span_versions.size(); i++) { versions_to_be_changed->push_back(span_versions[i]); } @@ -1560,40 +1586,34 @@ OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed( return OLAP_SUCCESS; } -OLAPStatus SchemaChangeHandler::_add_alter_task( - AlterTabletType alter_tablet_type, - TabletSharedPtr base_tablet, - TabletSharedPtr new_tablet, - const vector<Version>& versions_to_be_changed) { - +OLAPStatus SchemaChangeHandler::_add_alter_task(AlterTabletType alter_tablet_type, + TabletSharedPtr base_tablet, + TabletSharedPtr new_tablet, + const vector<Version>& versions_to_be_changed) { // check new tablet exists, // prevent to set base's status after new's dropping (clear base's status) if (StorageEngine::instance()->tablet_manager()->get_tablet( - new_tablet->tablet_id(), new_tablet->schema_hash()) == nullptr) { + new_tablet->tablet_id(), new_tablet->schema_hash()) == nullptr) { LOG(WARNING) << "new_tablet does not exist. tablet=" << new_tablet->full_name(); return OLAP_ERR_TABLE_NOT_FOUND; } // 1. 在新表和旧表中添加schema change标志 base_tablet->delete_alter_task(); - base_tablet->add_alter_task(new_tablet->tablet_id(), - new_tablet->schema_hash(), - versions_to_be_changed, - alter_tablet_type); + base_tablet->add_alter_task(new_tablet->tablet_id(), new_tablet->schema_hash(), + versions_to_be_changed, alter_tablet_type); base_tablet->save_meta(); - new_tablet->add_alter_task(base_tablet->tablet_id(), - base_tablet->schema_hash(), - vector<Version>(), // empty versions + new_tablet->add_alter_task(base_tablet->tablet_id(), base_tablet->schema_hash(), + vector<Version>(), // empty versions alter_tablet_type); new_tablet->save_meta(); LOG(INFO) << "successfully add alter task to both base and new"; return OLAP_SUCCESS; } -OLAPStatus SchemaChangeHandler::_save_alter_state( - AlterTabletState state, - TabletSharedPtr base_tablet, - TabletSharedPtr new_tablet) { +OLAPStatus SchemaChangeHandler::_save_alter_state(AlterTabletState state, + TabletSharedPtr base_tablet, + TabletSharedPtr new_tablet) { WriteLock base_wlock(base_tablet->get_header_lock_ptr()); WriteLock new_wlock(new_tablet->get_header_lock_ptr()); AlterTabletTaskSharedPtr base_alter_task = base_tablet->alter_task(); @@ -1604,8 +1624,7 @@ OLAPStatus SchemaChangeHandler::_save_alter_state( OLAPStatus res = base_tablet->set_alter_state(state); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to set alter state to " << state - << " tablet=" << base_tablet->full_name() - << " res=" << res; + << " tablet=" << base_tablet->full_name() << " res=" << res; return res; } base_tablet->save_meta(); @@ -1616,9 +1635,8 @@ OLAPStatus SchemaChangeHandler::_save_alter_state( } res = new_tablet->set_alter_state(state); if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to set alter state to " << state - << " tablet " << new_tablet->full_name() - << " res" << res; + LOG(WARNING) << "failed to set alter state to " << state << " tablet " + << new_tablet->full_name() << " res" << res; return res; } new_tablet->save_meta(); @@ -1641,16 +1659,16 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // change中增加了filter信息,在_parse_request中会设置filter的column信息 // 并在每次row block的change时,过滤一些数据 - RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), - sc_params.base_tablet, sc_params.delete_handler); + RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.base_tablet, + sc_params.delete_handler); bool sc_sorting = false; bool sc_directly = false; SchemaChange* sc_procedure = nullptr; // a. 解析Alter请求,转换成内部的表示形式 - OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, - &rb_changer, &sc_sorting, &sc_directly); + OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, + &sc_sorting, &sc_directly); if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to parse the request. res=" << res; goto PROCESS_ALTER_EXIT; @@ -1659,15 +1677,18 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // b. 生成历史数据转换器 if (sc_sorting) { size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change; - LOG(INFO) << "doing schema change with sorting for base_tablet " << sc_params.base_tablet->full_name(); - sc_procedure = new(nothrow) SchemaChangeWithSorting(rb_changer, - memory_limitation * 1024 * 1024 * 1024); + LOG(INFO) << "doing schema change with sorting for base_tablet " + << sc_params.base_tablet->full_name(); + sc_procedure = new (nothrow) + SchemaChangeWithSorting(rb_changer, memory_limitation * 1024 * 1024 * 1024); } else if (sc_directly) { - LOG(INFO) << "doing schema change directly for base_tablet " << sc_params.base_tablet->full_name(); - sc_procedure = new(nothrow) SchemaChangeDirectly(rb_changer); + LOG(INFO) << "doing schema change directly for base_tablet " + << sc_params.base_tablet->full_name(); + sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer); } else { - LOG(INFO) << "doing linked schema change for base_tablet " << sc_params.base_tablet->full_name(); - sc_procedure = new(nothrow) LinkedSchemaChange(rb_changer); + LOG(INFO) << "doing linked schema change for base_tablet " + << sc_params.base_tablet->full_name(); + sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer); } if (sc_procedure == nullptr) { @@ -1679,8 +1700,8 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa // c. 转换历史数据 for (auto& rs_reader : sc_params.ref_rowset_readers) { - VLOG(10) << "begin to convert a history rowset. version=" - << rs_reader->version().first << "-" << rs_reader->version().second; + VLOG(10) << "begin to convert a history rowset. version=" << rs_reader->version().first + << "-" << rs_reader->version().second; // set status for monitor // 只要有一个new_table为running,ref table就设置为running @@ -1714,15 +1735,18 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa goto PROCESS_ALTER_EXIT; } - if (!sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, sc_params.base_tablet)) { + if (!sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, + sc_params.base_tablet)) { LOG(WARNING) << "failed to process the version." - << " version=" << rs_reader->version().first - << "-" << rs_reader->version().second; + << " version=" << rs_reader->version().first << "-" + << rs_reader->version().second; res = OLAP_ERR_INPUT_PARAMETER_ERROR; - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); goto PROCESS_ALTER_EXIT; } - new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string()); + new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + + rowset_writer->rowset_id().to_string()); // 将新版本的数据加入header // 为了防止死锁的出现,一定要先锁住旧表,再锁住新表 sc_params.new_tablet->obtain_push_lock(); @@ -1735,37 +1759,34 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa res = sc_params.new_tablet->add_rowset(new_rowset, false); if (res == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) { LOG(WARNING) << "version already exist, version revert occured. " - << "tablet=" << sc_params.new_tablet->full_name() - << ", version='" << rs_reader->version().first - << "-" << rs_reader->version().second; + << "tablet=" << sc_params.new_tablet->full_name() << ", version='" + << rs_reader->version().first << "-" << rs_reader->version().second; StorageEngine::instance()->add_unused_rowset(new_rowset); res = OLAP_SUCCESS; } else if (res != OLAP_SUCCESS) { LOG(WARNING) << "failed to register new version. " << " tablet=" << sc_params.new_tablet->full_name() - << ", version=" << rs_reader->version().first - << "-" << rs_reader->version().second; + << ", version=" << rs_reader->version().first << "-" + << rs_reader->version().second; StorageEngine::instance()->add_unused_rowset(new_rowset); sc_params.new_tablet->release_push_lock(); goto PROCESS_ALTER_EXIT; } else { VLOG(3) << "register new version. tablet=" << sc_params.new_tablet->full_name() - << ", version=" << rs_reader->version().first - << "-" << rs_reader->version().second; + << ", version=" << rs_reader->version().first << "-" + << rs_reader->version().second; } sc_params.new_tablet->release_push_lock(); VLOG(10) << "succeed to convert a history version." - << " version=" << rs_reader->version().first - << "-" << rs_reader->version().second; + << " version=" << rs_reader->version().first << "-" << rs_reader->version().second; } // XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本 -PROCESS_ALTER_EXIT: - { - // save tablet meta here because rowset meta is not saved during add rowset - WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr()); - sc_params.new_tablet->save_meta(); - } +PROCESS_ALTER_EXIT : { + // save tablet meta here because rowset meta is not saved during add rowset + WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr()); + sc_params.new_tablet->save_meta(); +} if (res == OLAP_SUCCESS) { Version test_version(0, end_version); res = sc_params.new_tablet->check_version_integrity(test_version); @@ -1782,14 +1803,13 @@ PROCESS_ALTER_EXIT: // 分析column的mapping以及filter key的mapping OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, - RowBlockChanger* rb_changer, - bool* sc_sorting, + RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly) { OLAPStatus res = OLAP_SUCCESS; // set column mapping for (int i = 0, new_schema_size = new_tablet->tablet_schema().num_columns(); - i < new_schema_size; ++i) { + i < new_schema_size; ++i) { const TabletColumn& new_column = new_tablet->tablet_schema().column(i); const string& column_name = new_column.name(); ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i); @@ -1799,8 +1819,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, if (column_index < 0) { LOG(WARNING) << "referenced column was missing. " - << "[column=" << column_name - << " referenced_column=" << column_index << "]"; + << "[column=" << column_name << " referenced_column=" << column_index + << "]"; return OLAP_ERR_CE_CMD_PARAMS_ERROR; } @@ -1825,10 +1845,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, *sc_directly = true; } - if (OLAP_SUCCESS != (res = _init_column_mapping( - column_mapping, - new_column, - new_column.default_value()))) { + if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, new_column, + new_column.default_value()))) { return res; } @@ -1838,14 +1856,10 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, continue; } - // XXX: 只有DROP COLUMN时,遇到新Schema转旧Schema时会进入这里。 column_mapping->ref_column = -1; - if (OLAP_SUCCESS != (res = _init_column_mapping( - column_mapping, - new_column, - ""))) { + if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, new_column, ""))) { return res; } @@ -1859,8 +1873,7 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, // 若Key列的引用序列出现乱序,则需要重排序 int num_default_value = 0; - for (int i = 0, new_schema_size = new_tablet->num_key_columns(); - i < new_schema_size; ++i) { + for (int i = 0, new_schema_size = new_tablet->num_key_columns(); i < new_schema_size; ++i) { ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i); if (column_mapping->ref_column < 0) { @@ -1887,29 +1900,30 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, if (column_mapping->ref_column < 0) { continue; } else { - if (new_tablet_schema.column(i).type() != ref_tablet_schema.column(column_mapping->ref_column).type()) { + if (new_tablet_schema.column(i).type() != + ref_tablet_schema.column(column_mapping->ref_column).type()) { *sc_directly = true; return OLAP_SUCCESS; - } else if ( - (new_tablet_schema.column(i).type() == ref_tablet_schema.column(column_mapping->ref_column).type()) - && (new_tablet_schema.column(i).length() - != ref_tablet_schema.column(column_mapping->ref_column).length())) { + } else if ((new_tablet_schema.column(i).type() == + ref_tablet_schema.column(column_mapping->ref_column).type()) && + (new_tablet_schema.column(i).length() != + ref_tablet_schema.column(column_mapping->ref_column).length())) { *sc_directly = true; return OLAP_SUCCESS; - } else if (new_tablet_schema.column(i).is_bf_column() - != ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) { + } else if (new_tablet_schema.column(i).is_bf_column() != + ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) { *sc_directly = true; return OLAP_SUCCESS; - } else if (new_tablet_schema.column(i).has_bitmap_index() - != ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) { + } else if (new_tablet_schema.column(i).has_bitmap_index() != + ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) { *sc_directly = true; return OLAP_SUCCESS; } } } - if (base_tablet->delete_predicates().size() != 0){ + if (base_tablet->delete_predicates().size() != 0) { //there exists delete condition in header, can't do linked schema change *sc_directly = true; } @@ -1941,10 +1955,12 @@ OLAPStatus SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mappi return OLAP_SUCCESS; } -OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request) { +OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, + const TAlterTabletReqV2& request) { Version max_continuous_version = {-1, 0}; VersionHash max_continuous_version_hash = 0; - new_tablet->max_continuous_version_from_begining(&max_continuous_version, &max_continuous_version_hash); + new_tablet->max_continuous_version_from_begining(&max_continuous_version, + &max_continuous_version_hash); LOG(INFO) << "find max continuous version of tablet=" << new_tablet->full_name() << ", start_version=" << max_continuous_version.first << ", end_version=" << max_continuous_version.second; @@ -1955,4 +1971,4 @@ OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_table } } -} // namespace doris +} // namespace doris diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index ac6f55d..c749fec 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -23,11 +23,11 @@ #include <vector> #include "gen_cpp/AgentService_types.h" +#include "olap/column_mapping.h" #include "olap/delete_handler.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer.h" #include "olap/tablet.h" -#include "olap/column_mapping.h" namespace doris { // defined in 'field.h' @@ -42,26 +42,19 @@ class RowCursor; class RowBlockChanger { public: - RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr& base_tablet, + RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr& base_tablet, const DeleteHandler& delete_handler); - RowBlockChanger(const TabletSchema& tablet_schema, - const TabletSharedPtr& base_tablet); + RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr& base_tablet); virtual ~RowBlockChanger(); ColumnMapping* get_mutable_column_mapping(size_t column_index); - SchemaMapping get_schema_mapping() const { - return _schema_mapping; - } + SchemaMapping get_schema_mapping() const { return _schema_mapping; } - bool change_row_block( - const RowBlock* ref_block, - int32_t data_version, - RowBlock* mutable_block, - uint64_t* filtered_rows) const; + bool change_row_block(const RowBlock* ref_block, int32_t data_version, RowBlock* mutable_block, + uint64_t* filtered_rows) const; private: // @brief column-mapping specification of new schema @@ -78,8 +71,7 @@ public: RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation); virtual ~RowBlockAllocator(); - OLAPStatus allocate(RowBlock** row_block, size_t num_rows, - bool null_supported); + OLAPStatus allocate(RowBlock** row_block, size_t num_rows, bool null_supported); void release(RowBlock* row_block); private: @@ -94,34 +86,20 @@ public: SchemaChange() : _filtered_rows(0), _merged_rows(0) {} virtual ~SchemaChange() {} - virtual bool process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_builder, - TabletSharedPtr tablet, - TabletSharedPtr base_tablet) = 0; + virtual bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder, + TabletSharedPtr tablet, TabletSharedPtr base_tablet) = 0; - void add_filtered_rows(uint64_t filtered_rows) { - _filtered_rows += filtered_rows; - } + void add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; } - void add_merged_rows(uint64_t merged_rows) { - _merged_rows += merged_rows; - } + void add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; } - uint64_t filtered_rows() const { - return _filtered_rows; - } + uint64_t filtered_rows() const { return _filtered_rows; } - uint64_t merged_rows() const { - return _merged_rows; - } + uint64_t merged_rows() const { return _merged_rows; } - void reset_filtered_rows() { - _filtered_rows = 0; - } + void reset_filtered_rows() { _filtered_rows = 0; } - void reset_merged_rows() { - _merged_rows = 0; - } + void reset_merged_rows() { _merged_rows = 0; } private: uint64_t _filtered_rows; @@ -131,13 +109,12 @@ private: class LinkedSchemaChange : public SchemaChange { public: explicit LinkedSchemaChange(const RowBlockChanger& row_block_changer) - : _row_block_changer(row_block_changer) { } + : _row_block_changer(row_block_changer) {} ~LinkedSchemaChange() {} - bool process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) override; + bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; + private: const RowBlockChanger& _row_block_changer; DISALLOW_COPY_AND_ASSIGN(LinkedSchemaChange); @@ -148,14 +125,11 @@ class SchemaChangeDirectly : public SchemaChange { public: // @params tablet the instance of tablet which has new schema. // @params row_block_changer changer to modifiy the data of RowBlock - explicit SchemaChangeDirectly( - const RowBlockChanger& row_block_changer); + explicit SchemaChangeDirectly(const RowBlockChanger& row_block_changer); virtual ~SchemaChangeDirectly(); - virtual bool process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_writer, - TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) override; + virtual bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; private: const RowBlockChanger& _row_block_changer; @@ -170,30 +144,21 @@ private: // @breif schema change with sorting class SchemaChangeWithSorting : public SchemaChange { public: - explicit SchemaChangeWithSorting( - const RowBlockChanger& row_block_changer, - size_t memory_limitation); + explicit SchemaChangeWithSorting(const RowBlockChanger& row_block_changer, + size_t memory_limitation); virtual ~SchemaChangeWithSorting(); - virtual bool process(RowsetReaderSharedPtr rowset_reader, - RowsetWriter* new_rowset_builder, - TabletSharedPtr new_tablet, - TabletSharedPtr base_tablet) override; + virtual bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder, + TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override; private: - bool _internal_sorting( - const std::vector<RowBlock*>& row_block_arr, - const Version& temp_delta_versions, - const VersionHash version_hash, - TabletSharedPtr new_tablet, - RowsetTypePB new_rowset_type, - SegmentsOverlapPB segments_overlap, - RowsetSharedPtr* rowset); - - bool _external_sorting( - std::vector<RowsetSharedPtr>& src_rowsets, - RowsetWriter* rowset_writer, - TabletSharedPtr new_tablet); + bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr, + const Version& temp_delta_versions, const VersionHash version_hash, + TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset); + + bool _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer, + TabletSharedPtr new_tablet); const RowBlockChanger& _row_block_changer; size_t _memory_limitation; @@ -209,13 +174,11 @@ public: virtual ~SchemaChangeHandler() {} OLAPStatus process_alter_tablet(AlterTabletType alter_tablet_type, - const TAlterTabletReq& request); + const TAlterTabletReq& request); + + OLAPStatus schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, + RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset); - OLAPStatus schema_version_convert(TabletSharedPtr base_tablet, - TabletSharedPtr new_tablet, - RowsetSharedPtr* base_rowset, - RowsetSharedPtr* new_rowset); - // schema change v2, it will not set alter task in base tablet OLAPStatus process_alter_tablet_v2(const TAlterTabletReqV2& request); @@ -241,36 +204,33 @@ private: // add alter task to base_tablet and new_tablet. // add A->(B|C|...) relation chain to all of them. - OLAPStatus _add_alter_task(AlterTabletType alter_tablet_type, - TabletSharedPtr base_tablet, + OLAPStatus _add_alter_task(AlterTabletType alter_tablet_type, TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, const std::vector<Version>& versions_to_be_changed); - OLAPStatus _save_alter_state(AlterTabletState state, - TabletSharedPtr base_tablet, + OLAPStatus _save_alter_state(AlterTabletState state, TabletSharedPtr base_tablet, TabletSharedPtr new_tablet); - + OLAPStatus _do_process_alter_tablet_v2(const TAlterTabletReqV2& request); - + OLAPStatus _validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request); static OLAPStatus _convert_historical_rowsets(const SchemaChangeParams& sc_params); - static OLAPStatus _parse_request(TabletSharedPtr base_tablet, - TabletSharedPtr new_tablet, - RowBlockChanger* rb_changer, - bool* sc_sorting, + static OLAPStatus _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, + RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly); // 需要新建default_value时的初始化设置 static OLAPStatus _init_column_mapping(ColumnMapping* column_mapping, const TabletColumn& column_schema, const std::string& value); + private: RowsetReaderContext _reader_context; DISALLOW_COPY_AND_ASSIGN(SchemaChangeHandler); }; -} // namespace doris +} // namespace doris #endif // DORIS_BE_SRC_OLAP_SCHEMA_CHANGE_H diff --git a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md index c57f3ee..d1708d9 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md @@ -154,7 +154,6 @@ under the License. 5) The following types of conversions are currently supported (accuracy loss is guaranteed by the user) TINYINT/SMALLINT/INT/BIGINT is converted to TINYINT/SMALLINT/INT/BIGINT/DOUBLE. TINTINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE/DECIMAL is converted to VARCHAR - Convert LARGEINT to DOUBLE VARCHAR supports modification of maximum length Convert VARCHAR to TINYINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE. Convert VARCHAR to DATE (currently support six formats: "%Y-%m-%d", "%y-%m-%d", "%Y%m%d", "%y%m%d", "%Y/%m/%d, "%y/%m/%d") diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md index 4f5f9e6..3ac35b4 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md @@ -152,7 +152,6 @@ under the License. 5) 目前支持以下类型的转换(精度损失由用户保证) TINYINT/SMALLINT/INT/BIGINT 转换成 TINYINT/SMALLINT/INT/BIGINT/DOUBLE。 TINTINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE/DECIMAL 转换成 VARCHAR - LARGEINT 转换成 DOUBLE VARCHAR 支持修改最大长度 VARCHAR 转换成 TINTINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE VARCHAR 转换成 DATE (目前支持"%Y-%m-%d", "%y-%m-%d", "%Y%m%d", "%y%m%d", "%Y/%m/%d, "%y/%m/%d"六种格式化格式) diff --git a/fe/src/main/java/org/apache/doris/catalog/Column.java b/fe/src/main/java/org/apache/doris/catalog/Column.java index ae67839..53b65c7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/src/main/java/org/apache/doris/catalog/Column.java @@ -288,6 +288,12 @@ public class Column implements Writable { } } + // now we support convert decimal to varchar type + if ((getDataType() == PrimitiveType.DECIMAL && other.getDataType() == PrimitiveType.VARCHAR) + || (getDataType() == PrimitiveType.DECIMALV2 && other.getDataType() == PrimitiveType.VARCHAR)) { + return; + } + if (this.getPrecision() != other.getPrecision()) { throw new DdlException("Cannot change precision"); } diff --git a/fe/src/main/java/org/apache/doris/catalog/ColumnType.java b/fe/src/main/java/org/apache/doris/catalog/ColumnType.java index 557a6f5..f27e414 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ColumnType.java +++ b/fe/src/main/java/org/apache/doris/catalog/ColumnType.java @@ -75,14 +75,17 @@ public abstract class ColumnType { schemaChangeMatrix[PrimitiveType.CHAR.ordinal()][PrimitiveType.VARCHAR.ordinal()] = true; schemaChangeMatrix[PrimitiveType.CHAR.ordinal()][PrimitiveType.CHAR.ordinal()] = true; - schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DATE.ordinal()] = true; - schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.FLOAT.ordinal()] = true; - schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DOUBLE.ordinal()] = true; schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.TINYINT.ordinal()] = true; schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.SMALLINT.ordinal()] = true; schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.INT.ordinal()] = true; schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.BIGINT.ordinal()] = true; schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.LARGEINT.ordinal()] = true; + schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.FLOAT.ordinal()] = true; + schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DOUBLE.ordinal()] = true; + schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DATE.ordinal()] = true; + + schemaChangeMatrix[PrimitiveType.DECIMAL.ordinal()][PrimitiveType.VARCHAR.ordinal()] = true; + schemaChangeMatrix[PrimitiveType.DECIMALV2.ordinal()][PrimitiveType.VARCHAR.ordinal()] = true; schemaChangeMatrix[PrimitiveType.DATETIME.ordinal()][PrimitiveType.DATE.ordinal()] = true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org