This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a467e7a  [refactor][fix] small fixes and code cleanups related to 
schema change (#8328)
a467e7a is described below

commit a467e7a7906761549345a70419335895dec52a37
Author: dataroaring <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Sat Mar 12 22:05:43 2022 +0800

    [refactor][fix] small fixes and code cleanups related to schema change 
(#8328)
    
    For now, usage of RowBlockAllocator::allocate is a little complicated
    due to its ambiguous return value. Some callers just test the return value
    while some test the return value and non-null pointer. This patch let
    it return success code only when it succeeds, then caller can just
    test the return value.
---
 be/src/olap/schema_change.cpp                      | 104 ++++++---------------
 .../apache/doris/analysis/ColumnRenameClause.java  |   2 +-
 2 files changed, 32 insertions(+), 74 deletions(-)

diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 0bea116..c0b2085 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -92,7 +92,7 @@ private:
     };
 
     bool _make_heap(const std::vector<RowBlock*>& row_block_arr);
-    bool _pop_heap();
+    void _pop_heap();
 
     TabletSharedPtr _tablet;
     std::priority_queue<MergeElement> _heap;
@@ -693,8 +693,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
         }
 
         if (_row_block_allocator->allocate(&_swap_row_block, row_num, 
null_supported) !=
-                    OLAP_SUCCESS ||
-            _swap_row_block == nullptr) {
+                    OLAP_SUCCESS) {
             LOG(WARNING) << "fail to allocate memory.";
             return false;
         }
@@ -769,7 +768,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** 
row_block, size_t num_rows, bo
                      << "You can increase the memory "
                      << "by changing the 
Config.memory_limitation_per_thread_for_schema_change";
         *row_block = nullptr;
-        return OLAP_SUCCESS;
+        return OLAP_ERR_INPUT_PARAMETER_ERROR;
     }
 
     // TODO(lijiao) : Why abandon the original m_row_block_buffer
@@ -832,16 +831,17 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& 
row_block_arr, RowsetWr
         goto MERGE_ERR;
     }
 
-    _make_heap(row_block_arr);
+    if (!_make_heap(row_block_arr)) {
+        // There is error log in _make_heap, so no need to more log.
+        goto MERGE_ERR;
+    }
 
     row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
     while (_heap.size() > 0) {
         init_row_with_others(&row_cursor, *(_heap.top().row_cursor), 
mem_pool.get(),
                              agg_object_pool.get());
 
-        if (!_pop_heap()) {
-            goto MERGE_ERR;
-        }
+        _pop_heap();
 
         if (KeysType::DUP_KEYS == _tablet->keys_type()) {
             if (rowset_writer->add_row(row_cursor) != OLAP_SUCCESS) {
@@ -856,9 +856,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& 
row_block_arr, RowsetWr
             // we should fix this trick ASAP
             agg_update_row(&row_cursor, *(_heap.top().row_cursor), nullptr);
             ++tmp_merged_rows;
-            if (!_pop_heap()) {
-                goto MERGE_ERR;
-            }
+            _pop_heap();
         }
         agg_finalize_row(&row_cursor, mem_pool.get());
         if (rowset_writer->add_row(row_cursor) != OLAP_SUCCESS) {
@@ -915,19 +913,19 @@ bool RowBlockMerger::_make_heap(const 
std::vector<RowBlock*>& row_block_arr) {
     return true;
 }
 
-bool RowBlockMerger::_pop_heap() {
+void RowBlockMerger::_pop_heap() {
     MergeElement element = _heap.top();
     _heap.pop();
 
     if (++element.row_block_index >= 
element.row_block->row_block_info().row_num) {
         SAFE_DELETE(element.row_cursor);
-        return true;
+        return;
     }
 
     element.row_block->get_row(element.row_block_index, element.row_cursor);
 
     _heap.push(element);
-    return true;
+    return;
 }
 
 OLAPStatus LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
@@ -1023,19 +1021,9 @@ OLAPStatus 
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
         }
     }
 
-    bool need_create_empty_version = false;
     OLAPStatus res = OLAP_SUCCESS;
-    if (!rowset_reader->rowset()->empty()) {
-        int num_rows = rowset_reader->rowset()->num_rows();
-        if (num_rows == 0) {
-            // actually, the rowset is empty
-            need_create_empty_version = true;
-        }
-    } else {
-        need_create_empty_version = true;
-    }
-
-    if (need_create_empty_version) {
+    if (rowset_reader->rowset()->empty() ||
+        rowset_reader->rowset()->num_rows() == 0) {
         res = rowset_writer->flush();
         if (res != OLAP_SUCCESS) {
             LOG(WARNING) << "create empty version for schema change failed."
@@ -1104,14 +1092,10 @@ OLAPStatus 
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
                          << ", new_index_rows=" << rowset_writer->num_rows();
             res = OLAP_ERR_ALTER_STATUS_ERR;
         }
-        LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows()
-                  << ", merged_rows=" << merged_rows() << ", filtered_rows=" 
<< filtered_rows()
-                  << ", new_index_rows=" << rowset_writer->num_rows();
-    } else {
-        LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows()
-                  << ", merged_rows=" << merged_rows() << ", filtered_rows=" 
<< filtered_rows()
-                  << ", new_index_rows=" << rowset_writer->num_rows();
     }
+    LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows()
+              << ", merged_rows=" << merged_rows() << ", filtered_rows=" << 
filtered_rows()
+              << ", new_index_rows=" << rowset_writer->num_rows();
     return res;
 }
 
@@ -1147,14 +1131,10 @@ OLAPStatus 
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
         }
     }
 
-    bool need_create_empty_version = false;
     OLAPStatus res = OLAP_SUCCESS;
     RowsetSharedPtr rowset = rowset_reader->rowset();
-    if (rowset->empty() || rowset->num_rows() == 0) {
-        need_create_empty_version = true;
-    }
 
-    if (need_create_empty_version) {
+    if (rowset->empty() || rowset->num_rows() == 0) {
         res = new_rowset_writer->flush();
         if (res != OLAP_SUCCESS) {
             LOG(WARNING) << "create empty version for schema change failed."
@@ -1194,10 +1174,7 @@ OLAPStatus 
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
     reset_merged_rows();
     reset_filtered_rows();
 
-    bool use_beta_rowset = false;
-    if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-        use_beta_rowset = true;
-    }
+    bool use_beta_rowset = new_tablet->tablet_meta()->preferred_rowset_type() 
== BETA_ROWSET;
 
     SegmentsOverlapPB segments_overlap = 
rowset->rowset_meta()->segments_overlap();
     RowBlock* ref_row_block = nullptr;
@@ -1336,14 +1313,10 @@ OLAPStatus 
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
                          << ", new_index_rows=" << 
new_rowset_writer->num_rows();
             res = OLAP_ERR_ALTER_STATUS_ERR;
         }
-        LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows()
-                  << ", merged_rows=" << merged_rows() << ", filtered_rows=" 
<< filtered_rows()
-                  << ", new_index_rows=" << new_rowset_writer->num_rows();
-    } else {
-        LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows()
-                  << ", merged_rows=" << merged_rows() << ", filtered_rows=" 
<< filtered_rows()
-                  << ", new_index_rows=" << new_rowset_writer->num_rows();
     }
+    LOG(INFO) << "all row nums. source_rows=" << 
rowset_reader->rowset()->num_rows()
+              << ", merged_rows=" << merged_rows() << ", filtered_rows=" << 
filtered_rows()
+              << ", new_index_rows=" << new_rowset_writer->num_rows();
     return res;
 }
 
@@ -2049,35 +2022,20 @@ OLAPStatus SchemaChangeHandler::_parse_request(
         }
 
         // Newly added column go here
-        //if (new_column_schema.is_allow_null || 
new_column_schema.has_default_value) {
-        {
-            column_mapping->ref_column = -1;
-
-            if (i < base_tablet->num_short_key_columns()) {
-                *sc_directly = true;
-            }
-
-            if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, 
new_column,
-                                                            
new_column.default_value()))) {
-                return res;
-            }
+        column_mapping->ref_column = -1;
 
-            VLOG_TRACE << "A column with default value will be added after 
schema changing. "
-                       << "column=" << column_name
-                       << ", default_value=" << new_column.default_value();
-            continue;
+        if (i < base_tablet->num_short_key_columns()) {
+            *sc_directly = true;
         }
 
-        // XXX: Only when DROP COLUMN, you will enter here when you encounter 
a new Schema to an old Schema。
-        column_mapping->ref_column = -1;
-
-        if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, 
new_column, ""))) {
-            return res;
+        if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, 
new_column,
+                                                        
new_column.default_value()))) {
+           return res;
         }
 
-        VLOG_NOTICE << "A new schema delta is converted while dropping column. 
"
-                    << "Dropped column will be assigned as '0' for the older 
schema. "
-                    << "column=" << column_name;
+        VLOG_TRACE << "A column with default value will be added after schema 
changing. "
+                   << "column=" << column_name
+                   << ", default_value=" << new_column.default_value();
     }
 
     // Check if re-aggregation is needed.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
index e3d9d69..5924da5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnRenameClause.java
@@ -31,7 +31,7 @@ public class ColumnRenameClause extends AlterTableClause {
     private String newColName;
 
     public ColumnRenameClause(String colName, String newColName) {
-        super(AlterOpType.SCHEMA_CHANGE);
+        super(AlterOpType.RENAME);
         this.colName = colName;
         this.newColName = newColName;
         this.needTableStable = false;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to