This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bea5def48e2 [improvement](cloud) reduce call for partition get visible 
version (#36309)
bea5def48e2 is described below

commit bea5def48e20e7ccd357729c781c816d7e07c7c3
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Mon Jun 17 09:50:46 2024 +0800

    [improvement](cloud) reduce call for partition get visible version (#36309)
    
    For cloud, partition's get visible version need a rpc. So need to reduce
    call partition visible version.
---
 .../main/java/org/apache/doris/backup/RestoreJob.java  |  5 +++--
 .../main/java/org/apache/doris/catalog/OlapTable.java  |  3 ++-
 .../main/java/org/apache/doris/catalog/Partition.java  | 12 ++++++++++++
 .../apache/doris/common/proc/TabletHealthProcDir.java  | 18 ++++++++++++++----
 .../java/org/apache/doris/planner/OlapTableSink.java   | 18 ++++++++++++++----
 .../main/java/org/apache/doris/system/Diagnoser.java   |  5 +++--
 6 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 6928f8ef2f1..859d3ac5f0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1800,13 +1800,14 @@ public class RestoreJob extends AbstractJob {
 
                     // update partition visible version
                     part.updateVersionForRestore(entry.getValue());
+                    long visibleVersion = part.getVisibleVersion();
 
                     // we also need to update the replica version of these 
overwritten restored partitions
                     for (MaterializedIndex idx : 
part.getMaterializedIndices(IndexExtState.VISIBLE)) {
                         for (Tablet tablet : idx.getTablets()) {
                             for (Replica replica : tablet.getReplicas()) {
-                                if 
(!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) {
-                                    
replica.updateVersion(part.getVisibleVersion());
+                                if 
(!replica.checkVersionCatchUp(visibleVersion, false)) {
+                                    replica.updateVersion(visibleVersion);
                                 }
                             }
                         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 406c6d61793..479d214b7a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -675,6 +675,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         Map<Tag, Integer> nextIndexes = Maps.newHashMap();
         for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
             Partition partition = entry.getValue();
+            long visibleVersion = partition.getVisibleVersion();
             // entry.getKey() is the new partition id, use it to get the 
restore specified
             // replica allocation
             ReplicaAllocation replicaAlloc = 
partitionInfo.getReplicaAllocation(entry.getKey());
@@ -717,7 +718,7 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
                             for (Long beId : entry3.getValue()) {
                                 long newReplicaId = env.getNextId();
                                 Replica replica = new Replica(newReplicaId, 
beId, ReplicaState.NORMAL,
-                                        partition.getVisibleVersion(), 
schemaHash);
+                                        visibleVersion, schemaHash);
                                 newTablet.addReplica(replica, true /* is 
restore */);
                             }
                         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 1c9ec4e49a3..bb09773112d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -20,9 +20,12 @@ package org.apache.doris.catalog;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.rpc.RpcException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -37,6 +40,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Internal representation of partition-related metadata.
@@ -169,6 +173,14 @@ public class Partition extends MetaObject implements 
Writable {
         return visibleVersionTime;
     }
 
+    public static List<Long> getVisibleVersions(List<? extends Partition> 
partitions) throws RpcException {
+        if (Config.isCloudMode()) {
+            return 
CloudPartition.getSnapshotVisibleVersion((List<CloudPartition>) partitions);
+        } else {
+            return 
partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList());
+        }
+    }
+
     /**
      * if visibleVersion is 1, do not return creation time but 0
      *
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index e28c74c327e..e858b7f58c5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -33,6 +33,7 @@ import org.apache.doris.clone.TabletSchedCtx;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
@@ -40,6 +41,7 @@ import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -180,7 +182,16 @@ public class TabletHealthProcDir implements 
ProcDirInterface {
                         ? colocateTableIndex.getGroup(olapTable.getId()) : 
null;
                 olapTable.readLock();
                 try {
-                    for (Partition partition : olapTable.getAllPartitions()) {
+                    List<Partition> partitions = 
Lists.newArrayList(olapTable.getAllPartitions());
+                    List<Long> visibleVersions = null;
+                    try {
+                        visibleVersions = 
Partition.getVisibleVersions(partitions);
+                    } catch (RpcException e) {
+                        throw new RuntimeException("get version from meta 
service failed:" + e.getMessage());
+                    }
+                    for (int j = 0; j < partitions.size(); j++) {
+                        Partition partition = partitions.get(j);
+                        long visibleVersion = visibleVersions.get(j);
                         ReplicaAllocation replicaAlloc = 
olapTable.getPartitionInfo()
                                 .getReplicaAllocation(partition.getId());
                         for (MaterializedIndex materializedIndex : 
partition.getMaterializedIndices(
@@ -196,12 +207,11 @@ public class TabletHealthProcDir implements 
ProcDirInterface {
                                         replicaAlloc = 
groupSchema.getReplicaAlloc();
                                     }
                                     Set<Long> backendsSet = 
colocateTableIndex.getTabletBackendsByGroup(groupId, i);
-                                    res = 
tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc,
-                                            backendsSet);
+                                    res = 
tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet);
                                 } else {
                                     Pair<Tablet.TabletStatus, 
TabletSchedCtx.Priority> pair
                                             = 
tablet.getHealthStatusWithPriority(infoService,
-                                            partition.getVisibleVersion(), 
replicaAlloc, aliveBeIds);
+                                            visibleVersion, replicaAlloc, 
aliveBeIds);
                                     res = pair.first;
                                 }
                                 switch (res) { // CHECKSTYLE IGNORE THIS LINE: 
missing switch default
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 7e27dd8a606..8c40e467338 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -55,6 +55,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TColumn;
@@ -623,8 +624,17 @@ public class OlapTableSink extends DataSink {
         TOlapTableLocationParam slaveLocationParam = new 
TOlapTableLocationParam();
         // BE id -> path hash
         Multimap<Long, Long> allBePathsMap = HashMultimap.create();
-        for (Long partitionId : partitionIds) {
-            Partition partition = table.getPartition(partitionId);
+        List<Partition> partitions = partitionIds.stream().map(partitionId -> 
table.getPartition(partitionId))
+                .collect(Collectors.toList());
+        List<Long> visibleVersions = null;
+        try {
+            visibleVersions = Partition.getVisibleVersions(partitions);
+        } catch (RpcException e) {
+            throw new UserException("OlapTableSink get partition visible 
version failed", e);
+        }
+        for (int i = 0; i < partitions.size(); i++) {
+            Partition partition = partitions.get(i);
+            long visibleVersion = visibleVersions.get(i);
             int loadRequiredReplicaNum = 
table.getLoadRequiredReplicaNum(partition.getId());
             for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
                 // we should ensure the replica backend is alive
@@ -635,14 +645,14 @@ public class OlapTableSink extends DataSink {
                         String errMsg = "tablet " + tablet.getId() + " alive 
replica num " + bePathsMap.keySet().size()
                                 + " < load required replica num " + 
loadRequiredReplicaNum
                                 + ", alive backends: [" + 
StringUtils.join(bePathsMap.keySet(), ",") + "]"
-                                + ", detail: " + 
tablet.getDetailsStatusForQuery(partition.getVisibleVersion());
+                                + ", detail: " + 
tablet.getDetailsStatusForQuery(visibleVersion);
                         if (Config.isCloudMode()) {
                             errMsg += ConnectContext.cloudNoBackendsReason();
                         }
                         throw new 
UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg);
                     }
 
-                    debugWriteRandomChooseSink(tablet, 
partition.getVisibleVersion(), bePathsMap);
+                    debugWriteRandomChooseSink(tablet, visibleVersion, 
bePathsMap);
                     if (bePathsMap.keySet().isEmpty()) {
                         throw new 
UserException(InternalErrorCode.REPLICA_FEW_ERR,
                                 "tablet " + tablet.getId() + " no available 
replica");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
index 5e7748a3524..c2a091d11c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java
@@ -109,6 +109,7 @@ public class Diagnoser {
         StringBuilder versionErr = new StringBuilder();
         StringBuilder statusErr = new StringBuilder();
         StringBuilder compactionErr = new StringBuilder();
+        long visibleVersion = partition.getVisibleVersion();
         for (Replica replica : replicas) {
             // backend
             do {
@@ -139,10 +140,10 @@ public class Diagnoser {
                 }
             } while (false);
             // version
-            if (replica.getVersion() != partition.getVisibleVersion()) {
+            if (replica.getVersion() != visibleVersion) {
                 versionErr.append("Replica on backend " + 
replica.getBackendId() + "'s version ("
                         + replica.getVersion() + ") does not equal"
-                        + " to partition visible version (" + 
partition.getVisibleVersion() + ")");
+                        + " to partition visible version (" + visibleVersion + 
")");
             } else if (replica.getLastFailedVersion() != -1) {
                 versionErr.append("Replica on backend " + 
replica.getBackendId() + "'s last failed version is "
                         + replica.getLastFailedVersion());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to