This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ea05f  Add convert tablet rowset type (#2294)
63ea05f is described below

commit 63ea05f9c72e80b71ba64426776f36739e0be1d0
Author: kangpinghuang <kphu...@163.com>
AuthorDate: Wed Dec 18 18:49:47 2019 +0800

    Add convert tablet rowset type (#2294)
    
    to solve the issue #2246.
    
    scheme is as following:
    
        add a optional preferred_rowset_type in TabletMeta for V2 format rollup 
index tablet
        add a boolean session variable use_v2_rollup, if set true, the query 
will v2 storage format rollup index to process the query.
        test queries will be sent to online service to verify the correctness 
of segment-v2 by send the the same queries to fe with use_v2_rollup set or not 
to check whether the returned results are the same.
---
 be/src/olap/compaction.cpp                         |  3 ++
 be/src/olap/delta_writer.cpp                       |  3 ++
 be/src/olap/schema_change.cpp                      | 31 ++++++++++++++++--
 be/src/olap/tablet_manager.cpp                     |  3 ++
 be/src/olap/tablet_meta.cpp                        | 12 ++++++-
 be/src/olap/tablet_meta.h                          |  9 +++++
 .../cn/administrator-guide/variables.md            |  4 +++
 .../en/administrator-guide/variables_EN.md         |  4 +++
 fe/src/main/java/org/apache/doris/alter/Alter.java |  1 +
 .../doris/alter/MaterializedViewHandler.java       | 38 +++++++++++++++++++---
 .../java/org/apache/doris/alter/RollupJobV2.java   | 11 ++++++-
 .../apache/doris/alter/SchemaChangeHandler.java    | 11 ++++++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 10 ++++++
 .../analysis/ModifyTablePropertiesClause.java      |  5 +++
 .../apache/doris/common/util/PropertyAnalyzer.java | 26 +++++++++++++++
 .../org/apache/doris/planner/RollupSelector.java   | 22 +++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  9 +++++
 .../org/apache/doris/task/CreateReplicaTask.java   | 11 +++++++
 gensrc/proto/olap_file.proto                       |  1 +
 gensrc/thrift/AgentService.thrift                  | 10 ++++++
 20 files changed, 215 insertions(+), 9 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8da5bb6..0dd4de7 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -103,6 +103,9 @@ OLAPStatus Compaction::construct_output_rowset_writer() {
     context.partition_id = _tablet->partition_id();
     context.tablet_schema_hash = _tablet->schema_hash();
     context.rowset_type = StorageEngine::instance()->compaction_rowset_type();
+    if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
+        context.rowset_type = BETA_ROWSET;
+    }
     context.rowset_path_prefix = _tablet->tablet_path();
     context.tablet_schema = &(_tablet->tablet_schema());
     context.rowset_state = VISIBLE;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ac9cc9b..69f3f96 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -128,6 +128,9 @@ OLAPStatus DeltaWriter::init() {
     writer_context.partition_id = _req.partition_id;
     writer_context.tablet_schema_hash = _req.schema_hash;
     writer_context.rowset_type = _storage_engine->default_rowset_type();
+    if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
+        writer_context.rowset_type = BETA_ROWSET;
+    }
     writer_context.rowset_path_prefix = _tablet->tablet_path();
     writer_context.tablet_schema = &(_tablet->tablet_schema());
     writer_context.rowset_state = PREPARED;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index afa2af9..fd18754 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -957,6 +957,11 @@ bool SchemaChangeWithSorting::process(
     reset_merged_rows();
     reset_filtered_rows();
 
+    bool use_beta_rowset = false;
+    if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
+        use_beta_rowset = true;
+    }
+
     RowBlock* ref_row_block = nullptr;
     rowset_reader->next_block(&ref_row_block);
     while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
@@ -976,12 +981,16 @@ bool SchemaChangeWithSorting::process(
 
             // enter here while memory limitation is reached.
             RowsetSharedPtr rowset;
+            RowsetTypePB new_rowset_type = 
StorageEngine::instance()->default_rowset_type();
+            if (use_beta_rowset) {
+                new_rowset_type = BETA_ROWSET;
+            }
             if (!_internal_sorting(row_block_arr,
                                    Version(_temp_delta_versions.second,
                                            _temp_delta_versions.second),
                                    rowset_reader->version_hash(),
                                    new_tablet,
-                                   
StorageEngine::instance()->default_rowset_type(),
+                                   new_rowset_type,
                                    &rowset)) {
                 LOG(WARNING) << "failed to sorting internally.";
                 result = false;
@@ -1034,11 +1043,15 @@ bool SchemaChangeWithSorting::process(
         // enter here while memory limitation is reached.
         RowsetSharedPtr rowset = nullptr;
 
+        RowsetTypePB new_rowset_type = 
StorageEngine::instance()->default_rowset_type();
+        if (use_beta_rowset) {
+            new_rowset_type = BETA_ROWSET;
+        }
         if (!_internal_sorting(row_block_arr,
                                Version(_temp_delta_versions.second, 
_temp_delta_versions.second),
                                rowset_reader->version_hash(),
                                new_tablet,
-                               
StorageEngine::instance()->default_rowset_type(),
+                               new_rowset_type,
                                &rowset)) {
             LOG(WARNING) << "failed to sorting internally.";
             result = false;
@@ -1476,6 +1489,9 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
     writer_context.partition_id = (*base_rowset)->partition_id();
     writer_context.tablet_schema_hash = new_tablet->schema_hash();
     writer_context.rowset_type = 
StorageEngine::instance()->default_rowset_type();
+    if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
+        writer_context.rowset_type = BETA_ROWSET;
+    }
     writer_context.rowset_path_prefix = new_tablet->tablet_path();
     writer_context.tablet_schema = &(new_tablet->tablet_schema());
     writer_context.rowset_state = PREPARED;
@@ -1702,6 +1718,11 @@ OLAPStatus 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         writer_context.tablet_schema_hash = new_tablet->schema_hash();
         // linked schema change can't change rowset type, therefore we 
preserve rowset type in schema change now
         writer_context.rowset_type = 
StorageEngine::instance()->default_rowset_type();
+        if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == 
BETA_ROWSET) {
+            // Use beta rowset to do schema change
+            // And in this case, linked schema change will not be used.
+            writer_context.rowset_type = BETA_ROWSET;
+        }
         writer_context.rowset_path_prefix = new_tablet->tablet_path();
         writer_context.tablet_schema = &(new_tablet->tablet_schema());
         writer_context.rowset_state = VISIBLE;
@@ -1911,6 +1932,12 @@ OLAPStatus 
SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
         *sc_directly = true;
     }
 
+    if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
+        // if the default rowset type is alpha, and tablet meta has 
preferred_rowset_type
+        // field set to BETA_ROWST, just use directly type
+        *sc_directly = true;
+    }
+
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 11c201a..9473617 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1318,6 +1318,9 @@ OLAPStatus TabletManager::_create_tablet_meta(
                        shard_id, request.tablet_schema,
                        next_unique_id, col_ordinal_to_unique_id,
                        tablet_meta, tablet_uid);
+    if (request.__isset.storage_format && request.storage_format == 
TStorageFormat::V2) {
+        (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET);
+    }
     return res;
 }
 
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 590ddc9..b61d579 100755
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -75,7 +75,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id,
                        uint64_t shard_id, const TTabletSchema& tablet_schema,
                        uint32_t next_unique_id,
                        const std::unordered_map<uint32_t, uint32_t>& 
col_ordinal_to_unique_id, 
-                       TabletUid tablet_uid) : _tablet_uid(0, 0) {
+                       TabletUid tablet_uid) : _tablet_uid(0, 0),
+                       _preferred_rowset_type(ALPHA_ROWSET) {
     TabletMetaPB tablet_meta_pb;
     tablet_meta_pb.set_table_id(table_id);
     tablet_meta_pb.set_partition_id(partition_id);
@@ -365,6 +366,10 @@ OLAPStatus TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
     if (tablet_meta_pb.has_in_restore_mode()) {
         _in_restore_mode = tablet_meta_pb.in_restore_mode();
     }
+
+    if (tablet_meta_pb.has_preferred_rowset_type()) {
+        _preferred_rowset_type = tablet_meta_pb.preferred_rowset_type();
+    }
     return OLAP_SUCCESS;
 }
 
@@ -407,6 +412,11 @@ OLAPStatus TabletMeta::to_meta_pb(TabletMetaPB* 
tablet_meta_pb) {
     }
 
     tablet_meta_pb->set_in_restore_mode(in_restore_mode());
+
+    // to avoid modify tablet meta to the greatest extend
+    if (_preferred_rowset_type == BETA_ROWSET) {
+        tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
+    }
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 60831a7..a08d743 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -183,6 +183,14 @@ public:
 
     OLAPStatus set_partition_id(int64_t partition_id);
 
+    RowsetTypePB preferred_rowset_type() const {
+        return _preferred_rowset_type;
+    }
+
+    void set_preferred_rowset_type(RowsetTypePB preferred_rowset_type) {
+        _preferred_rowset_type = preferred_rowset_type;
+    }
+
 private:
     OLAPStatus _save_meta(DataDir* data_dir);
 
@@ -203,6 +211,7 @@ private:
     DelPredicateArray _del_pred_array;
     AlterTabletTaskSharedPtr _alter_task;
     bool _in_restore_mode = false;
+    RowsetTypePB _preferred_rowset_type;
 
     RWMutex _meta_lock;
 };
diff --git a/docs/documentation/cn/administrator-guide/variables.md 
b/docs/documentation/cn/administrator-guide/variables.md
index efa2bd5..2651160 100644
--- a/docs/documentation/cn/administrator-guide/variables.md
+++ b/docs/documentation/cn/administrator-guide/variables.md
@@ -310,3 +310,7 @@ SET forward_to_master = concat('tr', 'u', 'e');
 * `default_rowset_type`
 
     用于设置计算节点存储引擎默认的存储格式。当前支持的存储格式包括:alpha/beta。
+
+* `use_v2_rollup`
+
+    用于控制查询使用segment v2存储格式的rollup索引获取数据。该变量用于上线segment v2的时候,进行验证使用;其他情况,不建议使用。
diff --git a/docs/documentation/en/administrator-guide/variables_EN.md 
b/docs/documentation/en/administrator-guide/variables_EN.md
index 48a2e98..f483f10 100644
--- a/docs/documentation/en/administrator-guide/variables_EN.md
+++ b/docs/documentation/en/administrator-guide/variables_EN.md
@@ -311,3 +311,7 @@ SET forward_to_master = concat('tr', 'u', 'e');
 * `default_rowset_type`
 
     Used for setting the default storage format of Backends storage engine. 
Valid options: alpha/beta
+
+* `use_v2_rollup`
+
+    Used to control the sql query to use segment v2 rollup index to get data. 
This variable is only used for validation when upgrading to segment v2 feature. 
Otherwise, not recommended to use.
diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/src/main/java/org/apache/doris/alter/Alter.java
index ce3f36a..fd1e88b 100644
--- a/fe/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/src/main/java/org/apache/doris/alter/Alter.java
@@ -233,6 +233,7 @@ public class Alter {
             }
 
             if (hasSchemaChange || hasModifyProp) {
+                // if modify storage type to v2, do schema change to convert 
all related tablets to segment v2 format
                 schemaChangeHandler.process(alterClauses, clusterName, db, 
olapTable);
             } else if (hasAddMaterializedView || hasDropRollup) {
                 materializedViewHandler.process(alterClauses, clusterName, db, 
olapTable);
diff --git 
a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index ce62006..0891e40 100644
--- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -54,6 +54,7 @@ import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -130,18 +131,32 @@ public class MaterializedViewHandler extends AlterHandler 
{
      */
     private void processAddRollup(AddRollupClause alterClause, Database db, 
OlapTable olapTable)
             throws DdlException, AnalysisException {
-        String baseIndexName = alterClause.getBaseRollupName();
         String rollupIndexName = alterClause.getRollupName();
+        String newStorageFormatIndexName = "__v2_" + olapTable.getName();
+        boolean changeStorageFormat = false;
+        if (rollupIndexName.equalsIgnoreCase(olapTable.getName())) {
+            // for upgrade test to create segment v2 rollup index by using the 
sql:
+            // alter table table_name add rollup table_name (columns) 
properties ("storage_format" = "v2");
+            Map<String, String> properties = alterClause.getProperties();
+            if (properties == null || 
!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT)
+                    || 
!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).equalsIgnoreCase("v2"))
 {
+                throw new DdlException("Table[" + olapTable.getName() + "] can 
not " +
+                        "add segment v2 rollup index without setting storage 
format to v2.");
+            }
+            rollupIndexName = newStorageFormatIndexName;
+            changeStorageFormat = true;
+        }
+
         // get base index schema
+        String baseIndexName = alterClause.getBaseRollupName();
         if (baseIndexName == null) {
             // use table name as base table name
             baseIndexName = olapTable.getName();
         }
         // Step1.1 check base table and base index
         // Step1.2 alter clause validation
-        LOG.info("process add rollup[{}] based on [{}]", rollupIndexName, 
baseIndexName);
         Long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable);
-        List<Column> rollupSchema = 
checkAndPrepareMaterializedView(alterClause, olapTable, baseIndexId);
+        List<Column> rollupSchema = 
checkAndPrepareMaterializedView(alterClause, olapTable, baseIndexId, 
changeStorageFormat);
 
         // Step2: create materialized view job
         createMaterializedViewJob(rollupIndexName, baseIndexName, 
rollupSchema, alterClause.getProperties(),
@@ -187,6 +202,10 @@ public class MaterializedViewHandler extends AlterHandler {
                                             baseIndexId, mvIndexId, 
baseIndexName, mvName,
                                             mvColumns, baseSchemaHash, 
mvSchemaHash,
                                             mvKeysType, mvShortKeyColumnCount);
+        String newStorageFormatIndexName = "__v2_" + olapTable.getName();
+        if (mvName.equals(newStorageFormatIndexName)) {
+            mvJob.setStorageFormat(TStorageFormat.V2);
+        }
 
         /*
          * create all rollup indexes. and set state.
@@ -298,10 +317,21 @@ public class MaterializedViewHandler extends AlterHandler 
{
     }
 
     private List<Column> checkAndPrepareMaterializedView(AddRollupClause 
addRollupClause, OlapTable olapTable,
-                                                         long baseIndexId)
+                                                         long baseIndexId, 
boolean changeStorageFormat)
             throws DdlException {
         String rollupIndexName = addRollupClause.getRollupName();
         List<String> rollupColumnNames = addRollupClause.getColumnNames();
+        if (changeStorageFormat) {
+            String newStorageFormatIndexName = "__v2_" + olapTable.getName();
+            rollupIndexName = newStorageFormatIndexName;
+            List<Column> columns = olapTable.getSchemaByIndexId(baseIndexId);
+            // create the same schema as base table
+            rollupColumnNames.clear();
+            for (Column column : columns) {
+                rollupColumnNames.add(column.getName());
+            }
+        }
+
         // 2. check if rollup index already exists
         if (olapTable.hasMaterializedIndex(rollupIndexName)) {
             throw new DdlException("Rollup index[" + rollupIndexName + "] 
already exists");
diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index fd750bc..9b8753e 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -45,6 +45,7 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -93,6 +94,8 @@ public class RollupJobV2 extends AlterJobV2 {
     // save all create rollup tasks
     private AgentBatchTask rollupBatchTask = new AgentBatchTask();
 
+    private TStorageFormat storageFormat = null;
+
     public RollupJobV2(long jobId, long dbId, long tableId, String tableName, 
long timeoutMs,
             long baseIndexId, long rollupIndexId, String baseIndexName, String 
rollupIndexName,
             List<Column> rollupSchema, int baseSchemaHash, int 
rollupSchemaHash,
@@ -128,6 +131,10 @@ public class RollupJobV2 extends AlterJobV2 {
         this.partitionIdToRollupIndex.put(partitionId, mvIndex);
     }
 
+    public void setStorageFormat(TStorageFormat storageFormat) {
+        this.storageFormat = storageFormat;
+    }
+
     /*
      * runPendingJob():
      * 1. Create all rollup replicas and wait them finished.
@@ -189,7 +196,9 @@ public class RollupJobV2 extends AlterJobV2 {
                                 rollupKeysType, TStorageType.COLUMN, 
storageMedium,
                                 rollupSchema, tbl.getCopiedBfColumns(), 
tbl.getBfFpp(), countDownLatch);
                         
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), 
baseSchemaHash);
-
+                        if (this.storageFormat != null) {
+                            
createReplicaTask.setStorageFormat(this.storageFormat);
+                        }
                         batchTask.addTask(createReplicaTask);
                     } // end for rollupReplicas
                 } // end for rollupTablets
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index a86985e..cc9e182 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -66,6 +66,7 @@ import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.ClearAlterTask;
 import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -841,12 +842,18 @@ public class SchemaChangeHandler extends AlterHandler {
         
         // property 3: timeout
         long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, 
Config.alter_table_timeout_second);
-
+        TStorageFormat storageFormat = 
PropertyAnalyzer.analyzeStorageFormat(propertyMap);
         // create job
         Catalog catalog = Catalog.getCurrentCatalog();
         long jobId = catalog.getNextId();
         SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, dbId, 
olapTable.getId(), olapTable.getName(), timeoutSecond * 1000);
         schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp);
+
+        // If StorageFormat is set to TStorageFormat.V2
+        // which will create tablet with preferred_rowset_type set to BETA
+        // for both base table and rollup index
+        schemaChangeJob.setStorageFormat(storageFormat);
+
         // begin checking each table
         // ATTN: DO NOT change any meta in this loop 
         long tableId = olapTable.getId();
@@ -903,6 +910,8 @@ public class SchemaChangeHandler extends AlterHandler {
                         break;
                     }
                 }
+            } else if (storageFormat == TStorageFormat.V2) {
+                needAlter = true;
             }
 
             if (!needAlter) {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index f50232c..929b324 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -45,6 +45,7 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -98,6 +99,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     // The schema change job will wait all transactions before this txn id 
finished, then send the schema change tasks.
     protected long watershedTxnId = -1;
 
+    private TStorageFormat storageFormat = null;
+
     // save all schema change tasks
     private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
 
@@ -139,6 +142,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         this.bfFpp = bfFpp;
     }
 
+    public void setStorageFormat(TStorageFormat storageFormat) {
+        this.storageFormat = storageFormat;
+    }
+
     /*
      * runPendingJob():
      * 1. Create all replicas of all shadow indexes and wait them finished.
@@ -202,6 +209,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     tbl.getKeysType(), TStorageType.COLUMN, 
storageMedium,
                                     shadowSchema, bfColumns, bfFpp, 
countDownLatch);
                             
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, 
shadowIdxId).get(shadowTabletId), originSchemaHash);
+                            if (this.storageFormat != null) {
+                                
createReplicaTask.setStorageFormat(this.storageFormat);
+                            }
                             
                             batchTask.addTask(createReplicaTask);
                         } // end for rollupReplicas
diff --git 
a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java 
b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
index 984606d..bf76668 100644
--- 
a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
+++ 
b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java
@@ -63,6 +63,11 @@ public class ModifyTablePropertiesClause extends AlterClause 
{
         } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_BF_COLUMNS)
                 || properties.containsKey(PropertyAnalyzer.PROPERTIES_BF_FPP)) 
{
             // do nothing, these 2 properties will be analyzed when creating 
alter job
+        } else if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT)) {
+            if 
(!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).equalsIgnoreCase("v2"))
 {
+                throw new AnalysisException(
+                        "Property " + 
PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT + " should be v2");
+            }
         } else {
             throw new AnalysisException("Unknown table property: " + 
properties.keySet());
         }
diff --git 
a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index ae5d192..9c52bbc 100644
--- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
+import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -73,6 +74,13 @@ public class PropertyAnalyzer {
     public static final String PROPERTIES_DISTRIBUTION_TYPE = 
"distribution_type";
     public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = 
"send_clear_alter_tasks";
 
+    /*
+     * for upgrade alpha rowset to beta rowset, valid value: v1, v2
+     * v1: alpha rowset
+     * v2: beta rowset
+     */
+    public static final String PROPERTIES_STORAGE_FORMAT = "storage_format";
+
     public static DataProperty analyzeDataProperty(Map<String, String> 
properties, DataProperty oldDataProperty)
             throws AnalysisException {
         DataProperty dataProperty = oldDataProperty;
@@ -373,4 +381,22 @@ public class PropertyAnalyzer {
         }
         return timeout;
     }
+
+    // analyzeStorageFormat will parse the storage format from properties
+    // sql: alter table tablet_name set ("storage_format" = "v2")
+    // Use this sql to convert all tablets(base and rollup index) to a new 
format segment
+    public static TStorageFormat analyzeStorageFormat(Map<String, String> 
properties) {
+        String storage_format = "";
+        if (properties != null && 
properties.containsKey(PROPERTIES_STORAGE_FORMAT)) {
+            storage_format = properties.get(PROPERTIES_STORAGE_FORMAT);
+            properties.remove(PROPERTIES_TIMEOUT);
+        }
+        if (storage_format.equalsIgnoreCase("v1")) {
+            return TStorageFormat.V1;
+        } else if(storage_format.equalsIgnoreCase("v2")) {
+            return TStorageFormat.V2;
+        } else {
+            return TStorageFormat.DEFAULT;
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java 
b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
index 6100405..3d7315c 100644
--- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
@@ -36,6 +36,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.apache.doris.qe.ConnectContext;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -95,6 +96,27 @@ public final class RollupSelector {
                 }
             }
         }
+        String tableName = table.getName();
+        String v2RollupIndexName = "__v2_" + tableName;
+        Long v2RollupIndex = table.getIndexIdByName(v2RollupIndexName);
+        long baseIndexId = table.getBaseIndexId();
+        ConnectContext connectContext = ConnectContext.get();
+        boolean useV2Rollup = false;
+        if (connectContext != null) {
+            useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup();
+        }
+        if (baseIndexId == selectedIndexId && v2RollupIndex != null && 
useV2Rollup) {
+            // if the selectedIndexId is baseIndexId
+            // check whether there is a V2 rollup index and useV2Rollup flag 
is true,
+            // if both true, use v2 rollup index
+            selectedIndexId = v2RollupIndex;
+        }
+        if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex == 
selectedIndexId) {
+            // if the selectedIndexId is v2RollupIndex
+            // but useV2Rollup is false, use baseIndexId as selectedIndexId
+            // just make sure to use baseIndex instead of v2RollupIndex if the 
useV2Rollup is false
+            selectedIndexId = baseIndexId;
+        }
         return selectedIndexId;
     }
 
diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
index a8f34aa..63049c1 100644
--- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -89,6 +89,7 @@ public class SessionVariable implements Serializable, 
Writable {
      */
     public static final String LOAD_MEM_LIMIT = "load_mem_limit";
     public static final String DEFAULT_ROWSET_TYPE = "default_rowset_type";
+    public static final String USE_V2_ROLLUP = "use_v2_rollup";
 
     // max memory used on every backend.
     @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
@@ -217,6 +218,8 @@ public class SessionVariable implements Serializable, 
Writable {
     // the default rowset type flag which will be passed to Backends througth 
heartbeat
     @VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE)
     public static String defaultRowsetType = "alpha";
+    @VariableMgr.VarAttr(name = USE_V2_ROLLUP)
+    private boolean useV2Rollup = false;
 
     public long getMaxExecMemByte() {
         return maxExecMemByte;
@@ -384,6 +387,12 @@ public class SessionVariable implements Serializable, 
Writable {
         return forwardToMaster;
     }
 
+    public boolean getUseV2Rollup() { return useV2Rollup; }
+
+    public void setUseV2Rollup(boolean useV2Rollup) {
+        this.useV2Rollup = useV2Rollup;
+    }
+
     // Serialize to thrift object
     // used for rest api
     public TQueryOptions toThrift() {
diff --git a/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java 
b/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 9da5f4f..08dc585 100644
--- a/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -29,6 +29,7 @@ import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTabletSchema;
 import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.thrift.TStorageFormat;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -65,6 +66,8 @@ public class CreateReplicaTask extends AgentTask {
     private long baseTabletId = -1;
     private int baseSchemaHash = -1;
 
+    private TStorageFormat storageFormat = null;
+
     public CreateReplicaTask(long backendId, long dbId, long tableId, long 
partitionId, long indexId, long tabletId,
                              short shortKeyColumnCount, int schemaHash, long 
version, long versionHash,
                              KeysType keysType, TStorageType storageType,
@@ -120,6 +123,10 @@ public class CreateReplicaTask extends AgentTask {
         this.baseSchemaHash = baseSchemaHash;
     }
 
+    public void setStorageFormat(TStorageFormat storageFormat) {
+        this.storageFormat = storageFormat;
+    }
+
     public TCreateTabletReq toThrift() {
         TCreateTabletReq createTabletReq = new TCreateTabletReq();
         createTabletReq.setTablet_id(tabletId);
@@ -166,6 +173,10 @@ public class CreateReplicaTask extends AgentTask {
             createTabletReq.setBase_schema_hash(baseSchemaHash);
         }
 
+        if (storageFormat != null) {
+            createTabletReq.setStorage_format(storageFormat);
+        }
+
         return createTabletReq;
     }
 }
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 953dc59..73571f9 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -296,6 +296,7 @@ message TabletMetaPB {
     // a uniqued id to identified tablet with same tablet_id and schema hash
     optional PUniqueId tablet_uid = 14;
     optional int64 end_rowset_id = 15;
+    optional RowsetTypePB preferred_rowset_type = 16;
 }
 
 message OLAPIndexHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index ad4e15f..8c5b038 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -41,6 +41,15 @@ struct TTabletSchema {
     6: optional double bloom_filter_fpp
 }
 
+// this enum stands for different storage format in src_backends
+// V1 for Segment-V1
+// V2 for Segment-V2
+enum TStorageFormat {
+    DEFAULT,
+    V1,
+    V2
+}
+
 struct TCreateTabletReq {
     1: required Types.TTabletId tablet_id
     2: required TTabletSchema tablet_schema
@@ -58,6 +67,7 @@ struct TCreateTabletReq {
     11: optional i64 allocation_term
     // indicate whether this tablet is a compute storage split mode, we call 
it "eco mode"
     12: optional bool is_eco_mode
+    13: optional TStorageFormat storage_format
 }
 
 struct TDropTabletReq {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to