This is an automated email from the ASF dual-hosted git repository. zhaoc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 63ea05f Add convert tablet rowset type (#2294) 63ea05f is described below commit 63ea05f9c72e80b71ba64426776f36739e0be1d0 Author: kangpinghuang <kphu...@163.com> AuthorDate: Wed Dec 18 18:49:47 2019 +0800 Add convert tablet rowset type (#2294) to solve the issue #2246. scheme is as following: add a optional preferred_rowset_type in TabletMeta for V2 format rollup index tablet add a boolean session variable use_v2_rollup, if set true, the query will v2 storage format rollup index to process the query. test queries will be sent to online service to verify the correctness of segment-v2 by send the the same queries to fe with use_v2_rollup set or not to check whether the returned results are the same. --- be/src/olap/compaction.cpp | 3 ++ be/src/olap/delta_writer.cpp | 3 ++ be/src/olap/schema_change.cpp | 31 ++++++++++++++++-- be/src/olap/tablet_manager.cpp | 3 ++ be/src/olap/tablet_meta.cpp | 12 ++++++- be/src/olap/tablet_meta.h | 9 +++++ .../cn/administrator-guide/variables.md | 4 +++ .../en/administrator-guide/variables_EN.md | 4 +++ fe/src/main/java/org/apache/doris/alter/Alter.java | 1 + .../doris/alter/MaterializedViewHandler.java | 38 +++++++++++++++++++--- .../java/org/apache/doris/alter/RollupJobV2.java | 11 ++++++- .../apache/doris/alter/SchemaChangeHandler.java | 11 ++++++- .../org/apache/doris/alter/SchemaChangeJobV2.java | 10 ++++++ .../analysis/ModifyTablePropertiesClause.java | 5 +++ .../apache/doris/common/util/PropertyAnalyzer.java | 26 +++++++++++++++ .../org/apache/doris/planner/RollupSelector.java | 22 +++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 9 +++++ .../org/apache/doris/task/CreateReplicaTask.java | 11 +++++++ gensrc/proto/olap_file.proto | 1 + gensrc/thrift/AgentService.thrift | 10 ++++++ 20 files changed, 215 insertions(+), 9 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 8da5bb6..0dd4de7 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -103,6 +103,9 @@ OLAPStatus Compaction::construct_output_rowset_writer() { context.partition_id = _tablet->partition_id(); context.tablet_schema_hash = _tablet->schema_hash(); context.rowset_type = StorageEngine::instance()->compaction_rowset_type(); + if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + context.rowset_type = BETA_ROWSET; + } context.rowset_path_prefix = _tablet->tablet_path(); context.tablet_schema = &(_tablet->tablet_schema()); context.rowset_state = VISIBLE; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index ac9cc9b..69f3f96 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -128,6 +128,9 @@ OLAPStatus DeltaWriter::init() { writer_context.partition_id = _req.partition_id; writer_context.tablet_schema_hash = _req.schema_hash; writer_context.rowset_type = _storage_engine->default_rowset_type(); + if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + writer_context.rowset_type = BETA_ROWSET; + } writer_context.rowset_path_prefix = _tablet->tablet_path(); writer_context.tablet_schema = &(_tablet->tablet_schema()); writer_context.rowset_state = PREPARED; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index afa2af9..fd18754 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -957,6 +957,11 @@ bool SchemaChangeWithSorting::process( reset_merged_rows(); reset_filtered_rows(); + bool use_beta_rowset = false; + if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + use_beta_rowset = true; + } + RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); while (ref_row_block != nullptr && ref_row_block->has_remaining()) { @@ -976,12 +981,16 @@ bool SchemaChangeWithSorting::process( // enter here while memory limitation is reached. RowsetSharedPtr rowset; + RowsetTypePB new_rowset_type = StorageEngine::instance()->default_rowset_type(); + if (use_beta_rowset) { + new_rowset_type = BETA_ROWSET; + } if (!_internal_sorting(row_block_arr, Version(_temp_delta_versions.second, _temp_delta_versions.second), rowset_reader->version_hash(), new_tablet, - StorageEngine::instance()->default_rowset_type(), + new_rowset_type, &rowset)) { LOG(WARNING) << "failed to sorting internally."; result = false; @@ -1034,11 +1043,15 @@ bool SchemaChangeWithSorting::process( // enter here while memory limitation is reached. RowsetSharedPtr rowset = nullptr; + RowsetTypePB new_rowset_type = StorageEngine::instance()->default_rowset_type(); + if (use_beta_rowset) { + new_rowset_type = BETA_ROWSET; + } if (!_internal_sorting(row_block_arr, Version(_temp_delta_versions.second, _temp_delta_versions.second), rowset_reader->version_hash(), new_tablet, - StorageEngine::instance()->default_rowset_type(), + new_rowset_type, &rowset)) { LOG(WARNING) << "failed to sorting internally."; result = false; @@ -1476,6 +1489,9 @@ OLAPStatus SchemaChangeHandler::schema_version_convert( writer_context.partition_id = (*base_rowset)->partition_id(); writer_context.tablet_schema_hash = new_tablet->schema_hash(); writer_context.rowset_type = StorageEngine::instance()->default_rowset_type(); + if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + writer_context.rowset_type = BETA_ROWSET; + } writer_context.rowset_path_prefix = new_tablet->tablet_path(); writer_context.tablet_schema = &(new_tablet->tablet_schema()); writer_context.rowset_state = PREPARED; @@ -1702,6 +1718,11 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa writer_context.tablet_schema_hash = new_tablet->schema_hash(); // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now writer_context.rowset_type = StorageEngine::instance()->default_rowset_type(); + if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + // Use beta rowset to do schema change + // And in this case, linked schema change will not be used. + writer_context.rowset_type = BETA_ROWSET; + } writer_context.rowset_path_prefix = new_tablet->tablet_path(); writer_context.tablet_schema = &(new_tablet->tablet_schema()); writer_context.rowset_state = VISIBLE; @@ -1911,6 +1932,12 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet, *sc_directly = true; } + if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) { + // if the default rowset type is alpha, and tablet meta has preferred_rowset_type + // field set to BETA_ROWST, just use directly type + *sc_directly = true; + } + return OLAP_SUCCESS; } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 11c201a..9473617 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1318,6 +1318,9 @@ OLAPStatus TabletManager::_create_tablet_meta( shard_id, request.tablet_schema, next_unique_id, col_ordinal_to_unique_id, tablet_meta, tablet_uid); + if (request.__isset.storage_format && request.storage_format == TStorageFormat::V2) { + (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET); + } return res; } diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 590ddc9..b61d579 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -75,7 +75,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id, const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id, - TabletUid tablet_uid) : _tablet_uid(0, 0) { + TabletUid tablet_uid) : _tablet_uid(0, 0), + _preferred_rowset_type(ALPHA_ROWSET) { TabletMetaPB tablet_meta_pb; tablet_meta_pb.set_table_id(table_id); tablet_meta_pb.set_partition_id(partition_id); @@ -365,6 +366,10 @@ OLAPStatus TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { if (tablet_meta_pb.has_in_restore_mode()) { _in_restore_mode = tablet_meta_pb.in_restore_mode(); } + + if (tablet_meta_pb.has_preferred_rowset_type()) { + _preferred_rowset_type = tablet_meta_pb.preferred_rowset_type(); + } return OLAP_SUCCESS; } @@ -407,6 +412,11 @@ OLAPStatus TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { } tablet_meta_pb->set_in_restore_mode(in_restore_mode()); + + // to avoid modify tablet meta to the greatest extend + if (_preferred_rowset_type == BETA_ROWSET) { + tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type); + } return OLAP_SUCCESS; } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 60831a7..a08d743 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -183,6 +183,14 @@ public: OLAPStatus set_partition_id(int64_t partition_id); + RowsetTypePB preferred_rowset_type() const { + return _preferred_rowset_type; + } + + void set_preferred_rowset_type(RowsetTypePB preferred_rowset_type) { + _preferred_rowset_type = preferred_rowset_type; + } + private: OLAPStatus _save_meta(DataDir* data_dir); @@ -203,6 +211,7 @@ private: DelPredicateArray _del_pred_array; AlterTabletTaskSharedPtr _alter_task; bool _in_restore_mode = false; + RowsetTypePB _preferred_rowset_type; RWMutex _meta_lock; }; diff --git a/docs/documentation/cn/administrator-guide/variables.md b/docs/documentation/cn/administrator-guide/variables.md index efa2bd5..2651160 100644 --- a/docs/documentation/cn/administrator-guide/variables.md +++ b/docs/documentation/cn/administrator-guide/variables.md @@ -310,3 +310,7 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `default_rowset_type` 用于设置计算节点存储引擎默认的存储格式。当前支持的存储格式包括:alpha/beta。 + +* `use_v2_rollup` + + 用于控制查询使用segment v2存储格式的rollup索引获取数据。该变量用于上线segment v2的时候,进行验证使用;其他情况,不建议使用。 diff --git a/docs/documentation/en/administrator-guide/variables_EN.md b/docs/documentation/en/administrator-guide/variables_EN.md index 48a2e98..f483f10 100644 --- a/docs/documentation/en/administrator-guide/variables_EN.md +++ b/docs/documentation/en/administrator-guide/variables_EN.md @@ -311,3 +311,7 @@ SET forward_to_master = concat('tr', 'u', 'e'); * `default_rowset_type` Used for setting the default storage format of Backends storage engine. Valid options: alpha/beta + +* `use_v2_rollup` + + Used to control the sql query to use segment v2 rollup index to get data. This variable is only used for validation when upgrading to segment v2 feature. Otherwise, not recommended to use. diff --git a/fe/src/main/java/org/apache/doris/alter/Alter.java b/fe/src/main/java/org/apache/doris/alter/Alter.java index ce3f36a..fd1e88b 100644 --- a/fe/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/src/main/java/org/apache/doris/alter/Alter.java @@ -233,6 +233,7 @@ public class Alter { } if (hasSchemaChange || hasModifyProp) { + // if modify storage type to v2, do schema change to convert all related tablets to segment v2 format schemaChangeHandler.process(alterClauses, clusterName, db, olapTable); } else if (hasAddMaterializedView || hasDropRollup) { materializedViewHandler.process(alterClauses, clusterName, db, olapTable); diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index ce62006..0891e40 100644 --- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -54,6 +54,7 @@ import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -130,18 +131,32 @@ public class MaterializedViewHandler extends AlterHandler { */ private void processAddRollup(AddRollupClause alterClause, Database db, OlapTable olapTable) throws DdlException, AnalysisException { - String baseIndexName = alterClause.getBaseRollupName(); String rollupIndexName = alterClause.getRollupName(); + String newStorageFormatIndexName = "__v2_" + olapTable.getName(); + boolean changeStorageFormat = false; + if (rollupIndexName.equalsIgnoreCase(olapTable.getName())) { + // for upgrade test to create segment v2 rollup index by using the sql: + // alter table table_name add rollup table_name (columns) properties ("storage_format" = "v2"); + Map<String, String> properties = alterClause.getProperties(); + if (properties == null || !properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT) + || !properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).equalsIgnoreCase("v2")) { + throw new DdlException("Table[" + olapTable.getName() + "] can not " + + "add segment v2 rollup index without setting storage format to v2."); + } + rollupIndexName = newStorageFormatIndexName; + changeStorageFormat = true; + } + // get base index schema + String baseIndexName = alterClause.getBaseRollupName(); if (baseIndexName == null) { // use table name as base table name baseIndexName = olapTable.getName(); } // Step1.1 check base table and base index // Step1.2 alter clause validation - LOG.info("process add rollup[{}] based on [{}]", rollupIndexName, baseIndexName); Long baseIndexId = checkAndGetBaseIndex(baseIndexName, olapTable); - List<Column> rollupSchema = checkAndPrepareMaterializedView(alterClause, olapTable, baseIndexId); + List<Column> rollupSchema = checkAndPrepareMaterializedView(alterClause, olapTable, baseIndexId, changeStorageFormat); // Step2: create materialized view job createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, alterClause.getProperties(), @@ -187,6 +202,10 @@ public class MaterializedViewHandler extends AlterHandler { baseIndexId, mvIndexId, baseIndexName, mvName, mvColumns, baseSchemaHash, mvSchemaHash, mvKeysType, mvShortKeyColumnCount); + String newStorageFormatIndexName = "__v2_" + olapTable.getName(); + if (mvName.equals(newStorageFormatIndexName)) { + mvJob.setStorageFormat(TStorageFormat.V2); + } /* * create all rollup indexes. and set state. @@ -298,10 +317,21 @@ public class MaterializedViewHandler extends AlterHandler { } private List<Column> checkAndPrepareMaterializedView(AddRollupClause addRollupClause, OlapTable olapTable, - long baseIndexId) + long baseIndexId, boolean changeStorageFormat) throws DdlException { String rollupIndexName = addRollupClause.getRollupName(); List<String> rollupColumnNames = addRollupClause.getColumnNames(); + if (changeStorageFormat) { + String newStorageFormatIndexName = "__v2_" + olapTable.getName(); + rollupIndexName = newStorageFormatIndexName; + List<Column> columns = olapTable.getSchemaByIndexId(baseIndexId); + // create the same schema as base table + rollupColumnNames.clear(); + for (Column column : columns) { + rollupColumnNames.add(column.getName()); + } + } + // 2. check if rollup index already exists if (olapTable.hasMaterializedIndex(rollupIndexName)) { throw new DdlException("Rollup index[" + rollupIndexName + "] already exists"); diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java index fd750bc..9b8753e 100644 --- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -45,6 +45,7 @@ import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -93,6 +94,8 @@ public class RollupJobV2 extends AlterJobV2 { // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); + private TStorageFormat storageFormat = null; + public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, @@ -128,6 +131,10 @@ public class RollupJobV2 extends AlterJobV2 { this.partitionIdToRollupIndex.put(partitionId, mvIndex); } + public void setStorageFormat(TStorageFormat storageFormat) { + this.storageFormat = storageFormat; + } + /* * runPendingJob(): * 1. Create all rollup replicas and wait them finished. @@ -189,7 +196,9 @@ public class RollupJobV2 extends AlterJobV2 { rollupKeysType, TStorageType.COLUMN, storageMedium, rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); - + if (this.storageFormat != null) { + createReplicaTask.setStorageFormat(this.storageFormat); + } batchTask.addTask(createReplicaTask); } // end for rollupReplicas } // end for rollupTablets diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index a86985e..cc9e182 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -66,6 +66,7 @@ import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.ClearAlterTask; import org.apache.doris.thrift.TStorageMedium; +import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -841,12 +842,18 @@ public class SchemaChangeHandler extends AlterHandler { // property 3: timeout long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second); - + TStorageFormat storageFormat = PropertyAnalyzer.analyzeStorageFormat(propertyMap); // create job Catalog catalog = Catalog.getCurrentCatalog(); long jobId = catalog.getNextId(); SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, dbId, olapTable.getId(), olapTable.getName(), timeoutSecond * 1000); schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp); + + // If StorageFormat is set to TStorageFormat.V2 + // which will create tablet with preferred_rowset_type set to BETA + // for both base table and rollup index + schemaChangeJob.setStorageFormat(storageFormat); + // begin checking each table // ATTN: DO NOT change any meta in this loop long tableId = olapTable.getId(); @@ -903,6 +910,8 @@ public class SchemaChangeHandler extends AlterHandler { break; } } + } else if (storageFormat == TStorageFormat.V2) { + needAlter = true; } if (!needAlter) { diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f50232c..929b324 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -45,6 +45,7 @@ import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -98,6 +99,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { // The schema change job will wait all transactions before this txn id finished, then send the schema change tasks. protected long watershedTxnId = -1; + private TStorageFormat storageFormat = null; + // save all schema change tasks private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask(); @@ -139,6 +142,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 { this.bfFpp = bfFpp; } + public void setStorageFormat(TStorageFormat storageFormat) { + this.storageFormat = storageFormat; + } + /* * runPendingJob(): * 1. Create all replicas of all shadow indexes and wait them finished. @@ -202,6 +209,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.getKeysType(), TStorageType.COLUMN, storageMedium, shadowSchema, bfColumns, bfFpp, countDownLatch); createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash); + if (this.storageFormat != null) { + createReplicaTask.setStorageFormat(this.storageFormat); + } batchTask.addTask(createReplicaTask); } // end for rollupReplicas diff --git a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 984606d..bf76668 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -63,6 +63,11 @@ public class ModifyTablePropertiesClause extends AlterClause { } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BF_COLUMNS) || properties.containsKey(PropertyAnalyzer.PROPERTIES_BF_FPP)) { // do nothing, these 2 properties will be analyzed when creating alter job + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT)) { + if (!properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).equalsIgnoreCase("v2")) { + throw new AnalysisException( + "Property " + PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT + " should be v2"); + } } else { throw new AnalysisException("Unknown table property: " + properties.keySet()); } diff --git a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index ae5d192..9c52bbc 100644 --- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -30,6 +30,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; +import org.apache.doris.thrift.TStorageFormat; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -73,6 +74,13 @@ public class PropertyAnalyzer { public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type"; public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = "send_clear_alter_tasks"; + /* + * for upgrade alpha rowset to beta rowset, valid value: v1, v2 + * v1: alpha rowset + * v2: beta rowset + */ + public static final String PROPERTIES_STORAGE_FORMAT = "storage_format"; + public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty) throws AnalysisException { DataProperty dataProperty = oldDataProperty; @@ -373,4 +381,22 @@ public class PropertyAnalyzer { } return timeout; } + + // analyzeStorageFormat will parse the storage format from properties + // sql: alter table tablet_name set ("storage_format" = "v2") + // Use this sql to convert all tablets(base and rollup index) to a new format segment + public static TStorageFormat analyzeStorageFormat(Map<String, String> properties) { + String storage_format = ""; + if (properties != null && properties.containsKey(PROPERTIES_STORAGE_FORMAT)) { + storage_format = properties.get(PROPERTIES_STORAGE_FORMAT); + properties.remove(PROPERTIES_TIMEOUT); + } + if (storage_format.equalsIgnoreCase("v1")) { + return TStorageFormat.V1; + } else if(storage_format.equalsIgnoreCase("v2")) { + return TStorageFormat.V2; + } else { + return TStorageFormat.DEFAULT; + } + } } diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java index 6100405..3d7315c 100644 --- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java +++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java @@ -36,6 +36,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.doris.qe.ConnectContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -95,6 +96,27 @@ public final class RollupSelector { } } } + String tableName = table.getName(); + String v2RollupIndexName = "__v2_" + tableName; + Long v2RollupIndex = table.getIndexIdByName(v2RollupIndexName); + long baseIndexId = table.getBaseIndexId(); + ConnectContext connectContext = ConnectContext.get(); + boolean useV2Rollup = false; + if (connectContext != null) { + useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup(); + } + if (baseIndexId == selectedIndexId && v2RollupIndex != null && useV2Rollup) { + // if the selectedIndexId is baseIndexId + // check whether there is a V2 rollup index and useV2Rollup flag is true, + // if both true, use v2 rollup index + selectedIndexId = v2RollupIndex; + } + if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex == selectedIndexId) { + // if the selectedIndexId is v2RollupIndex + // but useV2Rollup is false, use baseIndexId as selectedIndexId + // just make sure to use baseIndex instead of v2RollupIndex if the useV2Rollup is false + selectedIndexId = baseIndexId; + } return selectedIndexId; } diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index a8f34aa..63049c1 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -89,6 +89,7 @@ public class SessionVariable implements Serializable, Writable { */ public static final String LOAD_MEM_LIMIT = "load_mem_limit"; public static final String DEFAULT_ROWSET_TYPE = "default_rowset_type"; + public static final String USE_V2_ROLLUP = "use_v2_rollup"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -217,6 +218,8 @@ public class SessionVariable implements Serializable, Writable { // the default rowset type flag which will be passed to Backends througth heartbeat @VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE) public static String defaultRowsetType = "alpha"; + @VariableMgr.VarAttr(name = USE_V2_ROLLUP) + private boolean useV2Rollup = false; public long getMaxExecMemByte() { return maxExecMemByte; @@ -384,6 +387,12 @@ public class SessionVariable implements Serializable, Writable { return forwardToMaster; } + public boolean getUseV2Rollup() { return useV2Rollup; } + + public void setUseV2Rollup(boolean useV2Rollup) { + this.useV2Rollup = useV2Rollup; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { diff --git a/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 9da5f4f..08dc585 100644 --- a/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -29,6 +29,7 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTabletSchema; import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TStorageFormat; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,6 +66,8 @@ public class CreateReplicaTask extends AgentTask { private long baseTabletId = -1; private int baseSchemaHash = -1; + private TStorageFormat storageFormat = null; + public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, short shortKeyColumnCount, int schemaHash, long version, long versionHash, KeysType keysType, TStorageType storageType, @@ -120,6 +123,10 @@ public class CreateReplicaTask extends AgentTask { this.baseSchemaHash = baseSchemaHash; } + public void setStorageFormat(TStorageFormat storageFormat) { + this.storageFormat = storageFormat; + } + public TCreateTabletReq toThrift() { TCreateTabletReq createTabletReq = new TCreateTabletReq(); createTabletReq.setTablet_id(tabletId); @@ -166,6 +173,10 @@ public class CreateReplicaTask extends AgentTask { createTabletReq.setBase_schema_hash(baseSchemaHash); } + if (storageFormat != null) { + createTabletReq.setStorage_format(storageFormat); + } + return createTabletReq; } } diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 953dc59..73571f9 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -296,6 +296,7 @@ message TabletMetaPB { // a uniqued id to identified tablet with same tablet_id and schema hash optional PUniqueId tablet_uid = 14; optional int64 end_rowset_id = 15; + optional RowsetTypePB preferred_rowset_type = 16; } message OLAPIndexHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index ad4e15f..8c5b038 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -41,6 +41,15 @@ struct TTabletSchema { 6: optional double bloom_filter_fpp } +// this enum stands for different storage format in src_backends +// V1 for Segment-V1 +// V2 for Segment-V2 +enum TStorageFormat { + DEFAULT, + V1, + V2 +} + struct TCreateTabletReq { 1: required Types.TTabletId tablet_id 2: required TTabletSchema tablet_schema @@ -58,6 +67,7 @@ struct TCreateTabletReq { 11: optional i64 allocation_term // indicate whether this tablet is a compute storage split mode, we call it "eco mode" 12: optional bool is_eco_mode + 13: optional TStorageFormat storage_format } struct TDropTabletReq { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org