This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc9d092e4fc [feature](stats)Get partition update rows. (#34908)
dc9d092e4fc is described below

commit dc9d092e4fcb2b49ab8f6904a3da290b429f4788
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Fri May 17 11:21:59 2024 +0800

    [feature](stats)Get partition update rows. (#34908)
    
    To support partition level stats collection, we need to record partition 
level update rows
    for each load transaction. This pr is to support this function.
    
    BE returns tabeltId -> updateRows map to FE, FE combine the row count of 
each tablet
    belongs to the same partition, and store the result to 
TableStatsMeta.partitionUpdateRows.
    
    Before, BE returns tableId -> updateRows map to FE, which doesn't contain 
partition level info.
---
 be/src/agent/task_worker_pool.cpp                  |   9 +-
 be/src/olap/task/engine_publish_version_task.cpp   |   8 +-
 be/src/olap/task/engine_publish_version_task.h     |   6 +-
 .../apache/doris/datasource/InternalCatalog.java   |  10 +-
 .../java/org/apache/doris/master/MasterImpl.java   |   4 +-
 .../apache/doris/statistics/AnalysisManager.java   | 123 ++++++++++++++++++---
 .../apache/doris/statistics/TableStatsMeta.java    |   3 +
 .../apache/doris/statistics/UpdateRowsEvent.java   |  52 ++++++++-
 .../org/apache/doris/task/PublishVersionTask.java  |  12 +-
 .../doris/transaction/DatabaseTransactionMgr.java  |  17 +--
 .../doris/transaction/PublishVersionDaemon.java    |  36 +++---
 .../apache/doris/transaction/TransactionState.java |  14 +--
 gensrc/thrift/MasterService.thrift                 |   1 +
 13 files changed, 219 insertions(+), 76 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 3fb5f3dfcf2..37feaf18dcd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1721,17 +1721,17 @@ void 
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
     std::map<TTabletId, TVersion> succ_tablets;
     // partition_id, tablet_id, publish_version
     std::vector<std::tuple<int64_t, int64_t, int64_t>> 
discontinuous_version_tablets;
-    std::map<TTableId, int64_t> table_id_to_num_delta_rows;
+    std::map<TTableId, std::map<TTabletId, int64_t>> 
table_id_to_tablet_id_to_num_delta_rows;
     uint32_t retry_time = 0;
     Status status;
     constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
     while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
         succ_tablets.clear();
         error_tablet_ids.clear();
-        table_id_to_num_delta_rows.clear();
+        table_id_to_tablet_id_to_num_delta_rows.clear();
         EnginePublishVersionTask engine_task(_engine, publish_version_req, 
&error_tablet_ids,
                                              &succ_tablets, 
&discontinuous_version_tablets,
-                                             &table_id_to_num_delta_rows);
+                                             
&table_id_to_tablet_id_to_num_delta_rows);
         SCOPED_ATTACH_TASK(engine_task.mem_tracker());
         status = engine_task.execute();
         if (status.ok()) {
@@ -1834,7 +1834,8 @@ void 
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
     finish_task_request.__set_succ_tablets(succ_tablets);
     finish_task_request.__set_error_tablet_ids(
             std::vector<TTabletId>(error_tablet_ids.begin(), 
error_tablet_ids.end()));
-    
finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
+    finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
+            table_id_to_tablet_id_to_num_delta_rows);
     finish_task(finish_task_request);
     remove_task_info(req.task_type, req.signature);
 }
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 41d28a6124b..acdcebae165 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -75,13 +75,13 @@ EnginePublishVersionTask::EnginePublishVersionTask(
         StorageEngine& engine, const TPublishVersionRequest& 
publish_version_req,
         std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* 
succ_tablets,
         std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinuous_version_tablets,
-        std::map<TTableId, int64_t>* table_id_to_num_delta_rows)
+        std::map<TTableId, std::map<TTabletId, int64_t>>* 
table_id_to_tablet_id_to_num_delta_rows)
         : _engine(engine),
           _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
           _succ_tablets(succ_tablets),
           _discontinuous_version_tablets(discontinuous_version_tablets),
-          _table_id_to_num_delta_rows(table_id_to_num_delta_rows) {
+          
_table_id_to_tablet_id_to_num_delta_rows(table_id_to_tablet_id_to_num_delta_rows)
 {
     _mem_tracker = 
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
                                                     "TabletPublishTxnTask");
 }
@@ -340,7 +340,9 @@ void 
EnginePublishVersionTask::_calculate_tbl_num_delta_rows(
             continue;
         }
         auto table_id = tablet->get_table_id();
-        (*_table_id_to_num_delta_rows)[table_id] += kv.second;
+        if (kv.second > 0) {
+            (*_table_id_to_tablet_id_to_num_delta_rows)[table_id][kv.first] += 
kv.second;
+        }
     }
 }
 
diff --git a/be/src/olap/task/engine_publish_version_task.h 
b/be/src/olap/task/engine_publish_version_task.h
index e4824176368..761c9358cd9 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -92,7 +92,8 @@ public:
             StorageEngine& engine, const TPublishVersionRequest& 
publish_version_req,
             std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, 
TVersion>* succ_tablets,
             std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
discontinous_version_tablets,
-            std::map<TTableId, int64_t>* table_id_to_num_delta_rows);
+            std::map<TTableId, std::map<TTabletId, int64_t>>*
+                    table_id_to_tablet_id_to_num_delta_rows);
     ~EnginePublishVersionTask() override = default;
 
     Status execute() override;
@@ -109,7 +110,8 @@ private:
     std::set<TTabletId>* _error_tablet_ids = nullptr;
     std::map<TTabletId, TVersion>* _succ_tablets;
     std::vector<std::tuple<int64_t, int64_t, int64_t>>* 
_discontinuous_version_tablets = nullptr;
-    std::map<TTableId, int64_t>* _table_id_to_num_delta_rows = nullptr;
+    std::map<TTableId, std::map<TTabletId, int64_t>>* 
_table_id_to_tablet_id_to_num_delta_rows =
+            nullptr;
 };
 
 class AsyncTabletPublishTask {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 2dc88d6df59..e2d84594547 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3204,7 +3204,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         Database db = (Database) getDbOrDdlException(dbTbl.getDb());
         OlapTable olapTable = db.getOlapTableOrDdlException(dbTbl.getTbl());
 
-        long rowsToTruncate = 0;
+        HashMap<Long, Long> updateRecords = new HashMap<>();
 
         BinlogConfig binlogConfig;
         olapTable.readLock();
@@ -3223,7 +3223,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     }
                     origPartitions.put(partName, partition.getId());
                     partitionsDistributionInfo.put(partition.getId(), 
partition.getDistributionInfo());
-                    rowsToTruncate += partition.getBaseIndex().getRowCount();
+                    updateRecords.put(partition.getId(), 
partition.getBaseIndex().getRowCount());
                 }
             } else {
                 for (Partition partition : olapTable.getPartitions()) {
@@ -3234,7 +3234,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     }
                     origPartitions.put(partition.getName(), partition.getId());
                     partitionsDistributionInfo.put(partition.getId(), 
partition.getDistributionInfo());
-                    rowsToTruncate += partition.getBaseIndex().getRowCount();
+                    updateRecords.put(partition.getId(), 
partition.getBaseIndex().getRowCount());
                 }
             }
             // if table currently has no partitions, this sql like empty 
command and do nothing, should return directly.
@@ -3390,13 +3390,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
         erasePartitionDropBackendReplicas(oldPartitions);
 
-        HashMap<Long, Long> updateRecords = new HashMap<>();
-        updateRecords.put(olapTable.getId(), rowsToTruncate);
         if (truncateEntireTable) {
             // Drop the whole table stats after truncate the entire table
             Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
         }
-        
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords);
+        
Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords, 
db.getId(), olapTable.getId());
         LOG.info("finished to truncate table {}, partitions: {}", 
tblRef.getName().toSql(), tblRef.getPartitionNames());
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index e9c850d0566..8be450404a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -511,8 +511,8 @@ public class MasterImpl {
             // not remove the task from queue and be will retry
             return;
         }
-        if (request.isSetTableIdToDeltaNumRows()) {
-            
publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows());
+        if (request.isSetTableIdToTabletIdToDeltaNumRows()) {
+            
publishVersionTask.setTableIdTabletsDeltaRows(request.getTableIdToTabletIdToDeltaNumRows());
         }
         AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
                                   publishVersionTask.getTaskType(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 0d4d18bc682..57109400ee7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -30,9 +30,13 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.View;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -46,6 +50,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.expressions.Slot;
@@ -86,6 +91,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -990,17 +996,33 @@ public class AnalysisManager implements Writable {
     }
 
     // Invoke this when load transaction finished.
-    public void updateUpdatedRows(Map<Long, Long> records) {
-        if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || 
records == null || records.isEmpty()) {
+    public void updateUpdatedRows(Map<Long, Map<Long, Long>> tabletRecords, 
long dbId) {
+        if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
             return;
         }
-        for (Entry<Long, Long> record : records.entrySet()) {
-            TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
-            if (statsStatus != null) {
-                statsStatus.updatedRows.addAndGet(record.getValue());
-            }
+        UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(tabletRecords, 
dbId);
+        replayUpdateRowsRecord(updateRowsEvent);
+        logUpdateRowsRecord(updateRowsEvent);
+    }
+
+    // Invoke this when load truncate table finished.
+    public void updateUpdatedRows(Map<Long, Long> partitionToUpdateRows, long 
dbId, long tableId) {
+        if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
+            return;
+        }
+        UpdateRowsEvent updateRowsEvent = new 
UpdateRowsEvent(partitionToUpdateRows, dbId, tableId);
+        replayUpdateRowsRecord(updateRowsEvent);
+        logUpdateRowsRecord(updateRowsEvent);
+    }
+
+    // Invoke this for cloud version load.
+    public void updateUpdatedRows(Map<Long, Long> updatedRows) {
+        if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
+            return;
         }
-        logUpdateRowsRecord(new UpdateRowsEvent(records));
+        UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(updatedRows);
+        replayUpdateRowsRecord(updateRowsEvent);
+        logUpdateRowsRecord(updateRowsEvent);
     }
 
     // Set to true means new partition loaded data
@@ -1039,13 +1061,86 @@ public class AnalysisManager implements Writable {
     }
 
     public void replayUpdateRowsRecord(UpdateRowsEvent event) {
-        if (event == null || event.getRecords() == null) {
+        // For older version compatible.
+        InternalCatalog catalog = Env.getCurrentInternalCatalog();
+        if (event.getRecords() != null) {
+            for (Entry<Long, Long> record : event.getRecords().entrySet()) {
+                TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+                if (statsStatus != null) {
+                    statsStatus.updatedRows.addAndGet(record.getValue());
+                }
+            }
             return;
         }
-        for (Entry<Long, Long> record : event.getRecords().entrySet()) {
-            TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+
+        // Record : TableId -> (TabletId -> update rows)
+        if (event.getTabletRecords() != null) {
+            for (Entry<Long, Map<Long, Long>> record : 
event.getTabletRecords().entrySet()) {
+                TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
+                if (statsStatus != null) {
+                    Table table = 
catalog.getDb(event.getDbId()).get().getTable(record.getKey()).get();
+                    if (!(table instanceof OlapTable)) {
+                        continue;
+                    }
+                    OlapTable olapTable = (OlapTable) table;
+                    short replicaNum = 
olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
+                    Map<Long, Long> tabletRows = record.getValue();
+                    long tableUpdateRows = 0;
+                    for (Entry<Long, Long> entry : tabletRows.entrySet()) {
+                        tableUpdateRows += entry.getValue() / replicaNum;
+                    }
+                    statsStatus.updatedRows.addAndGet(tableUpdateRows);
+                    if (StatisticsUtil.enablePartitionAnalyze()) {
+                        updatePartitionRows(olapTable, tabletRows, 
statsStatus, replicaNum);
+                    }
+                }
+            }
+            return;
+        }
+
+        // Handle truncate table
+        if (event.getPartitionToUpdateRows() != null && event.getTableId() > 
0) {
+            Map<Long, Long> partRows = event.getPartitionToUpdateRows();
+            long totalRows = partRows.values().stream().mapToLong(rows -> 
rows).sum();
+            TableStatsMeta statsStatus = idToTblStats.get(event.getTableId());
             if (statsStatus != null) {
-                statsStatus.updatedRows.addAndGet(record.getValue());
+                statsStatus.updatedRows.addAndGet(totalRows);
+                if (StatisticsUtil.enablePartitionAnalyze()) {
+                    for (Entry<Long, Long> entry : partRows.entrySet()) {
+                        
statsStatus.partitionUpdateRows.computeIfPresent(entry.getKey(),
+                                (id, rows) -> rows += entry.getValue());
+                        
statsStatus.partitionUpdateRows.putIfAbsent(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+        }
+    }
+
+    protected void updatePartitionRows(OlapTable table, Map<Long, Long> 
originTabletToRows,
+                                       TableStatsMeta statsStatus, short 
replicaNum) {
+        List<Partition> partitions = table.getPartitions().stream().sorted(
+                
Comparator.comparing(Partition::getVisibleVersionTime).reversed()).collect(Collectors.toList());
+        Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows);
+        int tabletCount = tabletToRows.size();
+        for (Partition p : partitions) {
+            MaterializedIndex baseIndex = p.getBaseIndex();
+            Iterator<Entry<Long, Long>> iterator = 
tabletToRows.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Entry<Long, Long> entry = iterator.next();
+                long tabletId = entry.getKey();
+                Tablet tablet = baseIndex.getTablet(tabletId);
+                if (tablet == null) {
+                    continue;
+                }
+                long tabletRows = entry.getValue();
+                statsStatus.partitionUpdateRows.computeIfPresent(p.getId(),
+                        (id, rows) -> rows += tabletRows / replicaNum);
+                statsStatus.partitionUpdateRows.putIfAbsent(p.getId(), 
tabletRows / replicaNum);
+                iterator.remove();
+                tabletCount--;
+            }
+            if (tabletCount <= 0) {
+                break;
             }
         }
     }
@@ -1079,10 +1174,6 @@ public class AnalysisManager implements Writable {
         return tableStats.findColumnStatsMeta(indexName, colName);
     }
 
-    public AnalysisJob findJob(long id) {
-        return idToAnalysisJob.get(id);
-    }
-
     public AnalysisInfo findJobInfo(long id) {
         return analysisJobInfoMap.get(id);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
index 02ec6abf705..3aa7decfed3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java
@@ -79,6 +79,9 @@ public class TableStatsMeta implements Writable {
     @SerializedName("userInjected")
     public boolean userInjected;
 
+    @SerializedName("pur")
+    public ConcurrentMap<Long, Long> partitionUpdateRows = new 
ConcurrentHashMap<>();
+
     @VisibleForTesting
     public TableStatsMeta() {
         tblId = 0;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
index 8cce3d29391..c9a0b9d820d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/UpdateRowsEvent.java
@@ -31,16 +31,66 @@ import java.util.Map;
 public class UpdateRowsEvent implements Writable {
 
     @SerializedName("records")
-    private Map<Long, Long> records;
+    private final Map<Long, Long> records;
+
+    @SerializedName("tr")
+    private final Map<Long, Map<Long, Long>> tabletRecords;
+
+    @SerializedName("dbId")
+    private final long dbId;
+
+    @SerializedName("pur")
+    private final Map<Long, Long> partitionToUpdateRows;
+
+    @SerializedName("tableId")
+    private final long tableId;
 
     public UpdateRowsEvent(Map<Long, Long> records) {
         this.records = records;
+        this.tabletRecords = null;
+        this.dbId = -1;
+        this.partitionToUpdateRows = null;
+        this.tableId = -1;
+    }
+
+    public UpdateRowsEvent(Map<Long, Map<Long, Long>> tabletRecords, long 
dbId) {
+        this.records = null;
+        this.tabletRecords = tabletRecords;
+        this.dbId = dbId;
+        this.partitionToUpdateRows = null;
+        this.tableId = -1;
+    }
+
+    public UpdateRowsEvent(Map<Long, Long> partitionToUpdateRows, long dbId, 
long tableId) {
+        this.records = null;
+        this.tabletRecords = null;
+        this.dbId = dbId;
+        this.partitionToUpdateRows = partitionToUpdateRows;
+        this.tableId = tableId;
     }
 
+    // TableId -> table update rows
     public Map<Long, Long> getRecords() {
         return records;
     }
 
+    // TableId -> (TabletId -> tablet update rows)
+    public Map<Long, Map<Long, Long>> getTabletRecords() {
+        return tabletRecords;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public Map<Long, Long> getPartitionToUpdateRows() {
+        return partitionToUpdateRows;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
index 222cf0fd78e..0cde6de539a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PublishVersionTask.java
@@ -50,9 +50,9 @@ public class PublishVersionTask extends AgentTask {
     private Map<Long, Long> succTablets;
 
     /**
-     * To collect loaded rows for each table from each BE
+     * To collect loaded rows for each tablet from each BE
      */
-    private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap();
+    private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = 
Maps.newHashMap();
 
     public PublishVersionTask(long backendId, long transactionId, long dbId,
             List<TPartitionVersionInfo> partitionVersionInfos, long 
createTime) {
@@ -99,11 +99,11 @@ public class PublishVersionTask extends AgentTask {
         this.errorTablets.addAll(errorTablets);
     }
 
-    public void setTableIdToDeltaNumRows(Map<Long, Long> 
tabletIdToDeltaNumRows) {
-        this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
+    public void setTableIdTabletsDeltaRows(Map<Long, Map<Long, Long>> 
tableIdToTabletDeltaRows) {
+        this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows);
     }
 
-    public Map<Long, Long> getTableIdToDeltaNumRows() {
-        return tableIdToDeltaNumRows;
+    public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
+        return tableIdToTabletDeltaRows;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index e0383e6f7c0..0f709bec24f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -2263,23 +2263,8 @@ public class DatabaseTransactionMgr {
             long tableVersionTime = tableCommitInfo.getVersionTime();
             table.updateVisibleVersionAndTime(tableVersion, tableVersionTime);
         }
-        Map<Long, Long> tableIdToTotalNumDeltaRows = 
transactionState.getTableIdToTotalNumDeltaRows();
-        Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
-        tableIdToTotalNumDeltaRows
-                        .forEach((tableId, numRows) -> {
-                            OlapTable table = (OlapTable) 
db.getTableNullable(tableId);
-                            if (table != null) {
-                                short replicaNum = table.getTableProperty()
-                                        .getReplicaAllocation()
-                                        .getTotalReplicaNum();
-                                tableIdToNumDeltaRows.put(tableId, numRows / 
replicaNum);
-                            }
-                        });
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("table id to loaded rows:{}", tableIdToNumDeltaRows);
-        }
         analysisManager.setNewPartitionLoaded(newPartitionLoadedTableIds);
-        analysisManager.updateUpdatedRows(tableIdToNumDeltaRows);
+        
analysisManager.updateUpdatedRows(transactionState.getTableIdToTabletDeltaRows(),
 db.getId());
         return true;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index f4814965473..c9dd3dd258a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -151,24 +151,16 @@ public class PublishVersionDaemon extends MasterDaemon {
     private void tryFinishTxn(List<TransactionState> readyTransactionStates,
                                      SystemInfoService infoService, 
GlobalTransactionMgrIface globalTransactionMgr,
                                      Map<Long, Long> partitionVisibleVersions, 
Map<Long, Set<Long>> backendPartitions) {
-        Map<Long, Long> tableIdToTotalDeltaNumRows = Maps.newHashMap();
+        Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = 
Maps.newHashMap();
         // try to finish the transaction, if failed just retry in next loop
         for (TransactionState transactionState : readyTransactionStates) {
             AtomicBoolean hasBackendAliveAndUnfinishedTask = new 
AtomicBoolean(false);
             Set<Long> notFinishTaskBe = Sets.newHashSet();
-            transactionState.getPublishVersionTasks().entrySet().forEach(entry 
-> {
-                long beId = entry.getKey();
-                List<PublishVersionTask> tasks = entry.getValue();
+            transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
+                long beId = key;
                 for (PublishVersionTask task : tasks) {
                     if (task.isFinished()) {
-                        if (CollectionUtils.isEmpty(task.getErrorTablets())) {
-                            Map<Long, Long> tableIdToDeltaNumRows = 
task.getTableIdToDeltaNumRows();
-                            tableIdToDeltaNumRows.forEach((tableId, numRows) 
-> {
-                                tableIdToTotalDeltaNumRows
-                                        .computeIfPresent(tableId, (id, 
orgNumRows) -> orgNumRows + numRows);
-                                
tableIdToTotalDeltaNumRows.putIfAbsent(tableId, numRows);
-                            });
-                        }
+                        calculateTaskUpdateRows(tableIdToTabletDeltaRows, 
task);
                     } else {
                         if 
(infoService.checkBackendAlive(task.getBackendId())) {
                             hasBackendAliveAndUnfinishedTask.set(true);
@@ -178,7 +170,7 @@ public class PublishVersionDaemon extends MasterDaemon {
                 }
             });
 
-            
transactionState.setTableIdToTotalNumDeltaRows(tableIdToTotalDeltaNumRows);
+            
transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, 
transactionState);
             }
@@ -236,6 +228,24 @@ public class PublishVersionDaemon extends MasterDaemon {
         } // end for readyTransactionStates
     }
 
+    // Merge task tablets update rows to tableToTabletsDelta.
+    private void calculateTaskUpdateRows(Map<Long, Map<Long, Long>> 
tableIdToTabletDeltaRows, PublishVersionTask task) {
+        if (CollectionUtils.isEmpty(task.getErrorTablets())) {
+            for (Entry<Long, Map<Long, Long>> tableEntry : 
task.getTableIdToTabletDeltaRows().entrySet()) {
+                if (tableIdToTabletDeltaRows.containsKey(tableEntry.getKey())) 
{
+                    Map<Long, Long> tabletsDelta = 
tableIdToTabletDeltaRows.get(tableEntry.getKey());
+                    for (Entry<Long, Long> tabletEntry : 
tableEntry.getValue().entrySet()) {
+                        tabletsDelta.computeIfPresent(tabletEntry.getKey(),
+                                (tabletId, origRows) -> origRows + 
tabletEntry.getValue());
+                        tabletsDelta.putIfAbsent(tabletEntry.getKey(), 
tabletEntry.getValue());
+                    }
+                } else {
+                    tableIdToTabletDeltaRows.put(tableEntry.getKey(), 
tableEntry.getValue());
+                }
+            }
+        }
+    }
+
     private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState 
transactionState,
             TableCommitInfo tableCommitInfo) throws MetaNotFoundException {
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 257226b4c8a..e12eeb0c594 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -272,9 +272,9 @@ public class TransactionState implements Writable {
     private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();
 
     /**
-     * the value is the num delta rows of all replicas in each table
+     * the value is the num delta rows of all replicas in each tablet
      */
-    private final Map<Long, Long> tableIdToTotalNumDeltaRows = 
Maps.newHashMap();
+    private final Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = 
Maps.newHashMap();
 
     private String errorLogUrl = null;
 
@@ -785,12 +785,12 @@ public class TransactionState implements Writable {
         }
     }
 
-    public Map<Long, Long> getTableIdToTotalNumDeltaRows() {
-        return tableIdToTotalNumDeltaRows;
+    public Map<Long, Map<Long, Long>> getTableIdToTabletDeltaRows() {
+        return tableIdToTabletDeltaRows;
     }
 
-    public void setTableIdToTotalNumDeltaRows(Map<Long, Long> 
tableIdToTotalNumDeltaRows) {
-        this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows);
+    public void setTableIdToTabletDeltaRows(Map<Long, Map<Long, Long>> 
tableIdToTabletDeltaRows) {
+        this.tableIdToTabletDeltaRows.putAll(tableIdToTabletDeltaRows);
     }
 
     public void setErrorMsg(String errMsg) {
@@ -808,7 +808,7 @@ public class TransactionState implements Writable {
     // reduce memory
     public void pruneAfterVisible() {
         publishVersionTasks.clear();
-        tableIdToTotalNumDeltaRows.clear();
+        tableIdToTabletDeltaRows.clear();
         // TODO if subTransactionStates can be cleared?
     }
 
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index ec647dbcf92..1db7a109f55 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -71,6 +71,7 @@ struct TFinishTaskRequest {
     16: optional i64 copy_time_ms
     17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
     18: optional map<i64, i64> table_id_to_delta_num_rows
+    19: optional map<i64, map<i64, i64>> 
table_id_to_tablet_id_to_delta_num_rows
 }
 
 struct TTablet {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to