This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dc9d092e4fc [feature](stats)Get partition update rows. (#34908) dc9d092e4fc is described below commit dc9d092e4fcb2b49ab8f6904a3da290b429f4788 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Fri May 17 11:21:59 2024 +0800 [feature](stats)Get partition update rows. (#34908) To support partition level stats collection, we need to record partition level update rows for each load transaction. This pr is to support this function. BE returns tabeltId -> updateRows map to FE, FE combine the row count of each tablet belongs to the same partition, and store the result to TableStatsMeta.partitionUpdateRows. Before, BE returns tableId -> updateRows map to FE, which doesn't contain partition level info. --- be/src/agent/task_worker_pool.cpp | 9 +- be/src/olap/task/engine_publish_version_task.cpp | 8 +- be/src/olap/task/engine_publish_version_task.h | 6 +- .../apache/doris/datasource/InternalCatalog.java | 10 +- .../java/org/apache/doris/master/MasterImpl.java | 4 +- .../apache/doris/statistics/AnalysisManager.java | 123 ++++++++++++++++++--- .../apache/doris/statistics/TableStatsMeta.java | 3 + .../apache/doris/statistics/UpdateRowsEvent.java | 52 ++++++++- .../org/apache/doris/task/PublishVersionTask.java | 12 +- .../doris/transaction/DatabaseTransactionMgr.java | 17 +-- .../doris/transaction/PublishVersionDaemon.java | 36 +++--- .../apache/doris/transaction/TransactionState.java | 14 +-- gensrc/thrift/MasterService.thrift | 1 + 13 files changed, 219 insertions(+), 76 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 3fb5f3dfcf2..37feaf18dcd 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1721,17 +1721,17 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& std::map<TTabletId, TVersion> succ_tablets; // partition_id, tablet_id, publish_version std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets; - std::map<TTableId, int64_t> table_id_to_num_delta_rows; + std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows; uint32_t retry_time = 0; Status status; constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3; while (retry_time < PUBLISH_VERSION_MAX_RETRY) { succ_tablets.clear(); error_tablet_ids.clear(); - table_id_to_num_delta_rows.clear(); + table_id_to_tablet_id_to_num_delta_rows.clear(); EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids, &succ_tablets, &discontinuous_version_tablets, - &table_id_to_num_delta_rows); + &table_id_to_tablet_id_to_num_delta_rows); SCOPED_ATTACH_TASK(engine_task.mem_tracker()); status = engine_task.execute(); if (status.ok()) { @@ -1834,7 +1834,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& finish_task_request.__set_succ_tablets(succ_tablets); finish_task_request.__set_error_tablet_ids( std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end())); - finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows); + finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows( + table_id_to_tablet_id_to_num_delta_rows); finish_task(finish_task_request); remove_task_info(req.task_type, req.signature); } diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 41d28a6124b..acdcebae165 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -75,13 +75,13 @@ EnginePublishVersionTask::EnginePublishVersionTask( StorageEngine& engine, const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets, std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets, - std::map<TTableId, int64_t>* table_id_to_num_delta_rows) + std::map<TTableId, std::map<TTabletId, int64_t>>* table_id_to_tablet_id_to_num_delta_rows) : _engine(engine), _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids), _succ_tablets(succ_tablets), _discontinuous_version_tablets(discontinuous_version_tablets), - _table_id_to_num_delta_rows(table_id_to_num_delta_rows) { + _table_id_to_tablet_id_to_num_delta_rows(table_id_to_tablet_id_to_num_delta_rows) { _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "TabletPublishTxnTask"); } @@ -340,7 +340,9 @@ void EnginePublishVersionTask::_calculate_tbl_num_delta_rows( continue; } auto table_id = tablet->get_table_id(); - (*_table_id_to_num_delta_rows)[table_id] += kv.second; + if (kv.second > 0) { + (*_table_id_to_tablet_id_to_num_delta_rows)[table_id][kv.first] += kv.second; + } } } diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index e4824176368..761c9358cd9 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -92,7 +92,8 @@ public: StorageEngine& engine, const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets, std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets, - std::map<TTableId, int64_t>* table_id_to_num_delta_rows); + std::map<TTableId, std::map<TTabletId, int64_t>>* + table_id_to_tablet_id_to_num_delta_rows); ~EnginePublishVersionTask() override = default; Status execute() override; @@ -109,7 +110,8 @@ private: std::set<TTabletId>* _error_tablet_ids = nullptr; std::map<TTabletId, TVersion>* _succ_tablets; std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets = nullptr; - std::map<TTableId, int64_t>* _table_id_to_num_delta_rows = nullptr; + std::map<TTableId, std::map<TTabletId, int64_t>>* _table_id_to_tablet_id_to_num_delta_rows = + nullptr; }; class AsyncTabletPublishTask { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 2dc88d6df59..e2d84594547 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3204,7 +3204,7 @@ public class InternalCatalog implements CatalogIf<Database> { Database db = (Database) getDbOrDdlException(dbTbl.getDb()); OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl()); - long rowsToTruncate = 0; + HashMap<Long, Long> updateRecords = new HashMap<>(); BinlogConfig binlogConfig; olapTable.readLock(); @@ -3223,7 +3223,7 @@ public class InternalCatalog implements CatalogIf<Database> { } origPartitions.put(partName, partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); - rowsToTruncate += partition.getBaseIndex().getRowCount(); + updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount()); } } else { for (Partition partition : olapTable.getPartitions()) { @@ -3234,7 +3234,7 @@ public class InternalCatalog implements CatalogIf<Database> { } origPartitions.put(partition.getName(), partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); - rowsToTruncate += partition.getBaseIndex().getRowCount(); + updateRecords.put(partition.getId(), partition.getBaseIndex().getRowCount()); } } // if table currently has no partitions, this sql like empty command and do nothing, should return directly. @@ -3390,13 +3390,11 @@ public class InternalCatalog implements CatalogIf<Database> { erasePartitionDropBackendReplicas(oldPartitions); - HashMap<Long, Long> updateRecords = new HashMap<>(); - updateRecords.put(olapTable.getId(), rowsToTruncate); if (truncateEntireTable) { // Drop the whole table stats after truncate the entire table Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); } - Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords, db.getId(), olapTable.getId()); LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index e9c850d0566..8be450404a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -511,8 +511,8 @@ public class MasterImpl { // not remove the task from queue and be will retry return; } - if (request.isSetTableIdToDeltaNumRows()) { - publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows()); + if (request.isSetTableIdToTabletIdToDeltaNumRows()) { + publishVersionTask.setTableIdTabletsDeltaRows(request.getTableIdToTabletIdToDeltaNumRows()); } AgentTaskQueue.removeTask(publishVersionTask.getBackendId(), publishVersionTask.getTaskType(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 0d4d18bc682..57109400ee7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -30,9 +30,13 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.View; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -46,6 +50,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.expressions.Slot; @@ -86,6 +91,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -990,17 +996,33 @@ public class AnalysisManager implements Writable { } // Invoke this when load transaction finished. - public void updateUpdatedRows(Map<Long, Long> records) { - if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || records == null || records.isEmpty()) { + public void updateUpdatedRows(Map<Long, Map<Long, Long>> tabletRecords, long dbId) { + if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) { return; } - for (Entry<Long, Long> record : records.entrySet()) { - TableStatsMeta statsStatus = idToTblStats.get(record.getKey()); - if (statsStatus != null) { - statsStatus.updatedRows.addAndGet(record.getValue()); - } + UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(tabletRecords, dbId); + replayUpdateRowsRecord(updateRowsEvent); + logUpdateRowsRecord(updateRowsEvent); + } + + // Invoke this when load truncate table finished. + public void updateUpdatedRows(Map<Long, Long> partitionToUpdateRows, long dbId, long tableId) { + if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) { + return; + } + UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(partitionToUpdateRows, dbId, tableId); + replayUpdateRowsRecord(updateRowsEvent); + logUpdateRowsRecord(updateRowsEvent); + } + + // Invoke this for cloud version load. + public void updateUpdatedRows(Map<Long, Long> updatedRows) { + if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) { + return; } - logUpdateRowsRecord(new UpdateRowsEvent(records)); + UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(updatedRows); + replayUpdateRowsRecord(updateRowsEvent); + logUpdateRowsRecord(updateRowsEvent); } // Set to true means new partition loaded data @@ -1039,13 +1061,86 @@ public class AnalysisManager implements Writable { } public void replayUpdateRowsRecord(UpdateRowsEvent event) { - if (event == null || event.getRecords() == null) { + // For older version compatible. + InternalCatalog catalog = Env.getCurrentInternalCatalog(); + if (event.getRecords() != null) { + for (Entry<Long, Long> record : event.getRecords().entrySet()) { + TableStatsMeta statsStatus = idToTblStats.get(record.getKey()); + if (statsStatus != null) { + statsStatus.updatedRows.addAndGet(record.getValue()); + } + } return; } - for (Entry<Long, Long> record : event.getRecords().entrySet()) { - TableStatsMeta statsStatus = idToTblStats.get(record.getKey()); + + // Record : TableId -> (TabletId -> update rows) + if (event.getTabletRecords() != null) { + for (Entry<Long, Map<Long, Long>> record : event.getTabletRecords().entrySet()) { + TableStatsMeta statsStatus = idToTblStats.get(record.getKey()); + if (statsStatus != null) { + Table table = catalog.getDb(event.getDbId()).get().getTable(record.getKey()).get(); + if (!(table instanceof OlapTable)) { + continue; + } + OlapTable olapTable = (OlapTable) table; + short replicaNum = olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum(); + Map<Long, Long> tabletRows = record.getValue(); + long tableUpdateRows = 0; + for (Entry<Long, Long> entry : tabletRows.entrySet()) { + tableUpdateRows += entry.getValue() / replicaNum; + } + statsStatus.updatedRows.addAndGet(tableUpdateRows); + if (StatisticsUtil.enablePartitionAnalyze()) { + updatePartitionRows(olapTable, tabletRows, statsStatus, replicaNum); + } + } + } + return; + } + + // Handle truncate table + if (event.getPartitionToUpdateRows() != null && event.getTableId() > 0) { + Map<Long, Long> partRows = event.getPartitionToUpdateRows(); + long totalRows = partRows.values().stream().mapToLong(rows -> rows).sum(); + TableStatsMeta statsStatus = idToTblStats.get(event.getTableId()); if (statsStatus != null) { - statsStatus.updatedRows.addAndGet(record.getValue()); + statsStatus.updatedRows.addAndGet(totalRows); + if (StatisticsUtil.enablePartitionAnalyze()) { + for (Entry<Long, Long> entry : partRows.entrySet()) { + statsStatus.partitionUpdateRows.computeIfPresent(entry.getKey(), + (id, rows) -> rows += entry.getValue()); + statsStatus.partitionUpdateRows.putIfAbsent(entry.getKey(), entry.getValue()); + } + } + } + } + } + + protected void updatePartitionRows(OlapTable table, Map<Long, Long> originTabletToRows, + TableStatsMeta statsStatus, short replicaNum) { + List<Partition> partitions = table.getPartitions().stream().sorted( + Comparator.comparing(Partition::getVisibleVersionTime).reversed()).collect(Collectors.toList()); + Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows); + int tabletCount = tabletToRows.size(); + for (Partition p : partitions) { + MaterializedIndex baseIndex = p.getBaseIndex(); + Iterator<Entry<Long, Long>> iterator = tabletToRows.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<Long, Long> entry = iterator.next(); + long tabletId = entry.getKey(); + Tablet tablet = baseIndex.getTablet(tabletId); + if (tablet == null) { + continue; + } + long tabletRows = entry.getValue(); + statsStatus.partitionUpdateRows.computeIfPresent(p.getId(), + (id, rows) -> rows += tabletRows / replicaNum); + statsStatus.partitionUpdateRows.putIfAbsent(p.getId(), tabletRows / replicaNum); + iterator.remove(); + tabletCount--; + } + if (tabletCount <= 0) { + break; } } } @@ -1079,10 +1174,6 @@ public class AnalysisManager implements Writable { return tableStats.findColumnStatsMeta(indexName, colName); } - public AnalysisJob findJob(long id) { - return idToAnalysisJob.get(id); - } - public AnalysisInfo findJobInfo(long id) { return analysisJobInfoMap.get(id); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 02ec6abf705..3aa7decfed3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable { @SerializedName("userInjected") public boolean userInjected; + @SerializedName("pur") + public ConcurrentMap<Long, Long> partitionUpdateRows = new ConcurrentHashMap<>(); + @VisibleForTesting public TableStatsMeta() { tblId = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java index 8cce3d29391..c9a0b9d820d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java @@ -31,16 +31,66 @@ import java.util.Map; public class UpdateRowsEvent implements Writable { @SerializedName("records") - private Map<Long, Long> records; + private final Map<Long, Long> records; + + @SerializedName("tr") + private final Map<Long, Map<Long, Long>> tabletRecords; + + @SerializedName("dbId") + private final long dbId; + + @SerializedName("pur") + private final Map<Long, Long> partitionToUpdateRows; + + @SerializedName("tableId") + private final long tableId; public UpdateRowsEvent(Map<Long, Long> records) { this.records = records; + this.tabletRecords = null; + this.dbId = -1; + this.partitionToUpdateRows = null; + this.tableId = -1; + } + + public UpdateRowsEvent(Map<Long, Map<Long, Long>> tabletRecords, long dbId) { + this.records = null; + this.tabletRecords = tabletRecords; + this.dbId = dbId; + this.partitionToUpdateRows = null; + this.tableId = -1; + } + + public UpdateRowsEvent(Map<Long, Long> partitionToUpdateRows, long dbId, long tableId) { + this.records = null; + this.tabletRecords = null; + this.dbId = dbId; + this.partitionToUpdateRows = partitionToUpdateRows; + this.tableId = tableId; } + // TableId -> table update rows public Map<Long, Long> getRecords() { return records; } + // TableId -> (TabletId -> tablet update rows) + public Map<Long, Map<Long, Long>> getTabletRecords() { + return tabletRecords; + } + + public long getDbId() { + return dbId; + } + + public Map<Long, Long> getPartitionToUpdateRows() { + return partitionToUpdateRows; + } + + public long getTableId() { + return tableId; + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java index 222cf0fd78e..0cde6de539a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java @@ -50,9 +50,9 @@ public class PublishVersionTask extends AgentTask { private Map<Long, Long> succTablets; /** - * To collect loaded rows for each table from each BE + * To collect loaded rows for each tablet from each BE */ - private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap(); + private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap(); public PublishVersionTask(long backendId, long transactionId, long dbId, List<TPartitionVersionInfo> partitionVersionInfos, long createTime) { @@ -99,11 +99,11 @@ public class PublishVersionTask extends AgentTask { this.errorTablets.addAll(errorTablets); } - public void setTableIdToDeltaNumRows(Map<Long, Long> tabletIdToDeltaNumRows) { - this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows); + public void setTableIdTabletsDeltaRows(Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows) { + this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows); } - public Map<Long, Long> getTableIdToDeltaNumRows() { - return tableIdToDeltaNumRows; + public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() { + return tableIdToTabletDeltaRows; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index e0383e6f7c0..0f709bec24f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -2263,23 +2263,8 @@ public class DatabaseTransactionMgr { long tableVersionTime = tableCommitInfo.getVersionTime(); table.updateVisibleVersionAndTime(tableVersion, tableVersionTime); } - Map<Long, Long> tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows(); - Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap(); - tableIdToTotalNumDeltaRows - .forEach((tableId, numRows) -> { - OlapTable table = (OlapTable) db.getTableNullable(tableId); - if (table != null) { - short replicaNum = table.getTableProperty() - .getReplicaAllocation() - .getTotalReplicaNum(); - tableIdToNumDeltaRows.put(tableId, numRows / replicaNum); - } - }); - if (LOG.isDebugEnabled()) { - LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows); - } analysisManager.setNewPartitionLoaded(newPartitionLoadedTableIds); - analysisManager.updateUpdatedRows(tableIdToNumDeltaRows); + analysisManager.updateUpdatedRows(transactionState.getTableIdToTabletDeltaRows(), db.getId()); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index f4814965473..c9dd3dd258a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -151,24 +151,16 @@ public class PublishVersionDaemon extends MasterDaemon { private void tryFinishTxn(List<TransactionState> readyTransactionStates, SystemInfoService infoService, GlobalTransactionMgrIface globalTransactionMgr, Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) { - Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap(); + Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap(); // try to finish the transaction, if failed just retry in next loop for (TransactionState transactionState : readyTransactionStates) { AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false); Set<Long> notFinishTaskBe = Sets.newHashSet(); - transactionState.getPublishVersionTasks().entrySet().forEach(entry -> { - long beId = entry.getKey(); - List<PublishVersionTask> tasks = entry.getValue(); + transactionState.getPublishVersionTasks().forEach((key, tasks) -> { + long beId = key; for (PublishVersionTask task : tasks) { if (task.isFinished()) { - if (CollectionUtils.isEmpty(task.getErrorTablets())) { - Map<Long, Long> tableIdToDeltaNumRows = task.getTableIdToDeltaNumRows(); - tableIdToDeltaNumRows.forEach((tableId, numRows) -> { - tableIdToTotalDeltaNumRows - .computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows); - tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows); - }); - } + calculateTaskUpdateRows(tableIdToTabletDeltaRows, task); } else { if (infoService.checkBackendAlive(task.getBackendId())) { hasBackendAliveAndUnfinishedTask.set(true); @@ -178,7 +170,7 @@ public class PublishVersionDaemon extends MasterDaemon { } }); - transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows); + transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows); if (LOG.isDebugEnabled()) { LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState); } @@ -236,6 +228,24 @@ public class PublishVersionDaemon extends MasterDaemon { } // end for readyTransactionStates } + // Merge task tablets update rows to tableToTabletsDelta. + private void calculateTaskUpdateRows(Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows, PublishVersionTask task) { + if (CollectionUtils.isEmpty(task.getErrorTablets())) { + for (Entry<Long, Map<Long, Long>> tableEntry : task.getTableIdToTabletDeltaRows().entrySet()) { + if (tableIdToTabletDeltaRows.containsKey(tableEntry.getKey())) { + Map<Long, Long> tabletsDelta = tableIdToTabletDeltaRows.get(tableEntry.getKey()); + for (Entry<Long, Long> tabletEntry : tableEntry.getValue().entrySet()) { + tabletsDelta.computeIfPresent(tabletEntry.getKey(), + (tabletId, origRows) -> origRows + tabletEntry.getValue()); + tabletsDelta.putIfAbsent(tabletEntry.getKey(), tabletEntry.getValue()); + } + } else { + tableIdToTabletDeltaRows.put(tableEntry.getKey(), tableEntry.getValue()); + } + } + } + } + private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState transactionState, TableCommitInfo tableCommitInfo) throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 257226b4c8a..e12eeb0c594 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -272,9 +272,9 @@ public class TransactionState implements Writable { private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap(); /** - * the value is the num delta rows of all replicas in each table + * the value is the num delta rows of all replicas in each tablet */ - private final Map<Long, Long> tableIdToTotalNumDeltaRows = Maps.newHashMap(); + private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap(); private String errorLogUrl = null; @@ -785,12 +785,12 @@ public class TransactionState implements Writable { } } - public Map<Long, Long> getTableIdToTotalNumDeltaRows() { - return tableIdToTotalNumDeltaRows; + public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() { + return tableIdToTabletDeltaRows; } - public void setTableIdToTotalNumDeltaRows(Map<Long, Long> tableIdToTotalNumDeltaRows) { - this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows); + public void setTableIdToTabletDeltaRows(Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows) { + this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows); } public void setErrorMsg(String errMsg) { @@ -808,7 +808,7 @@ public class TransactionState implements Writable { // reduce memory public void pruneAfterVisible() { publishVersionTasks.clear(); - tableIdToTotalNumDeltaRows.clear(); + tableIdToTabletDeltaRows.clear(); // TODO if subTransactionStates can be cleared? } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index ec647dbcf92..1db7a109f55 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -71,6 +71,7 @@ struct TFinishTaskRequest { 16: optional i64 copy_time_ms 17: optional map<Types.TTabletId, Types.TVersion> succ_tablets 18: optional map<i64, i64> table_id_to_delta_num_rows + 19: optional map<i64, map<i64, i64>> table_id_to_tablet_id_to_delta_num_rows } struct TTablet { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org