This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bea5def48e2 [improvement](cloud) reduce call for partition get visible version (#36309) bea5def48e2 is described below commit bea5def48e20e7ccd357729c781c816d7e07c7c3 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Mon Jun 17 09:50:46 2024 +0800 [improvement](cloud) reduce call for partition get visible version (#36309) For cloud, partition's get visible version need a rpc. So need to reduce call partition visible version. --- .../main/java/org/apache/doris/backup/RestoreJob.java | 5 +++-- .../main/java/org/apache/doris/catalog/OlapTable.java | 3 ++- .../main/java/org/apache/doris/catalog/Partition.java | 12 ++++++++++++ .../apache/doris/common/proc/TabletHealthProcDir.java | 18 ++++++++++++++---- .../java/org/apache/doris/planner/OlapTableSink.java | 18 ++++++++++++++---- .../main/java/org/apache/doris/system/Diagnoser.java | 5 +++-- 6 files changed, 48 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6928f8ef2f1..859d3ac5f0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1800,13 +1800,14 @@ public class RestoreJob extends AbstractJob { // update partition visible version part.updateVersionForRestore(entry.getValue()); + long visibleVersion = part.getVisibleVersion(); // we also need to update the replica version of these overwritten restored partitions for (MaterializedIndex idx : part.getMaterializedIndices(IndexExtState.VISIBLE)) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { - if (!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) { - replica.updateVersion(part.getVisibleVersion()); + if (!replica.checkVersionCatchUp(visibleVersion, false)) { + replica.updateVersion(visibleVersion); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 406c6d61793..479d214b7a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -675,6 +675,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { Map<Tag, Integer> nextIndexes = Maps.newHashMap(); for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) { Partition partition = entry.getValue(); + long visibleVersion = partition.getVisibleVersion(); // entry.getKey() is the new partition id, use it to get the restore specified // replica allocation ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(entry.getKey()); @@ -717,7 +718,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { for (Long beId : entry3.getValue()) { long newReplicaId = env.getNextId(); Replica replica = new Replica(newReplicaId, beId, ReplicaState.NORMAL, - partition.getVisibleVersion(), schemaHash); + visibleVersion, schemaHash); newTablet.addReplica(replica, true /* is restore */); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 1c9ec4e49a3..bb09773112d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -20,9 +20,12 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.MaterializedIndex.IndexState; +import org.apache.doris.cloud.catalog.CloudPartition; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.rpc.RpcException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -37,6 +40,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Internal representation of partition-related metadata. @@ -169,6 +173,14 @@ public class Partition extends MetaObject implements Writable { return visibleVersionTime; } + public static List<Long> getVisibleVersions(List<? extends Partition> partitions) throws RpcException { + if (Config.isCloudMode()) { + return CloudPartition.getSnapshotVisibleVersion((List<CloudPartition>) partitions); + } else { + return partitions.stream().map(Partition::getVisibleVersion).collect(Collectors.toList()); + } + } + /** * if visibleVersion is 1, do not return creation time but 0 * diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index e28c74c327e..e858b7f58c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -33,6 +33,7 @@ import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.rpc.RpcException; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskQueue; @@ -40,6 +41,7 @@ import org.apache.doris.thrift.TTaskType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Comparator; @@ -180,7 +182,16 @@ public class TabletHealthProcDir implements ProcDirInterface { ? colocateTableIndex.getGroup(olapTable.getId()) : null; olapTable.readLock(); try { - for (Partition partition : olapTable.getAllPartitions()) { + List<Partition> partitions = Lists.newArrayList(olapTable.getAllPartitions()); + List<Long> visibleVersions = null; + try { + visibleVersions = Partition.getVisibleVersions(partitions); + } catch (RpcException e) { + throw new RuntimeException("get version from meta service failed:" + e.getMessage()); + } + for (int j = 0; j < partitions.size(); j++) { + Partition partition = partitions.get(j); + long visibleVersion = visibleVersions.get(j); ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo() .getReplicaAllocation(partition.getId()); for (MaterializedIndex materializedIndex : partition.getMaterializedIndices( @@ -196,12 +207,11 @@ public class TabletHealthProcDir implements ProcDirInterface { replicaAlloc = groupSchema.getReplicaAlloc(); } Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i); - res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, - backendsSet); + res = tablet.getColocateHealthStatus(visibleVersion, replicaAlloc, backendsSet); } else { Pair<Tablet.TabletStatus, TabletSchedCtx.Priority> pair = tablet.getHealthStatusWithPriority(infoService, - partition.getVisibleVersion(), replicaAlloc, aliveBeIds); + visibleVersion, replicaAlloc, aliveBeIds); res = pair.first; } switch (res) { // CHECKSTYLE IGNORE THIS LINE: missing switch default diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 7e27dd8a606..8c40e467338 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -55,6 +55,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TColumn; @@ -623,8 +624,17 @@ public class OlapTableSink extends DataSink { TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash Multimap<Long, Long> allBePathsMap = HashMultimap.create(); - for (Long partitionId : partitionIds) { - Partition partition = table.getPartition(partitionId); + List<Partition> partitions = partitionIds.stream().map(partitionId -> table.getPartition(partitionId)) + .collect(Collectors.toList()); + List<Long> visibleVersions = null; + try { + visibleVersions = Partition.getVisibleVersions(partitions); + } catch (RpcException e) { + throw new UserException("OlapTableSink get partition visible version failed", e); + } + for (int i = 0; i < partitions.size(); i++) { + Partition partition = partitions.get(i); + long visibleVersion = visibleVersions.get(i); int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId()); for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { // we should ensure the replica backend is alive @@ -635,14 +645,14 @@ public class OlapTableSink extends DataSink { String errMsg = "tablet " + tablet.getId() + " alive replica num " + bePathsMap.keySet().size() + " < load required replica num " + loadRequiredReplicaNum + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]" - + ", detail: " + tablet.getDetailsStatusForQuery(partition.getVisibleVersion()); + + ", detail: " + tablet.getDetailsStatusForQuery(visibleVersion); if (Config.isCloudMode()) { errMsg += ConnectContext.cloudNoBackendsReason(); } throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsg); } - debugWriteRandomChooseSink(tablet, partition.getVisibleVersion(), bePathsMap); + debugWriteRandomChooseSink(tablet, visibleVersion, bePathsMap); if (bePathsMap.keySet().isEmpty()) { throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "tablet " + tablet.getId() + " no available replica"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java index 5e7748a3524..c2a091d11c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java @@ -109,6 +109,7 @@ public class Diagnoser { StringBuilder versionErr = new StringBuilder(); StringBuilder statusErr = new StringBuilder(); StringBuilder compactionErr = new StringBuilder(); + long visibleVersion = partition.getVisibleVersion(); for (Replica replica : replicas) { // backend do { @@ -139,10 +140,10 @@ public class Diagnoser { } } while (false); // version - if (replica.getVersion() != partition.getVisibleVersion()) { + if (replica.getVersion() != visibleVersion) { versionErr.append("Replica on backend " + replica.getBackendId() + "'s version (" + replica.getVersion() + ") does not equal" - + " to partition visible version (" + partition.getVisibleVersion() + ")"); + + " to partition visible version (" + visibleVersion + ")"); } else if (replica.getLastFailedVersion() != -1) { versionErr.append("Replica on backend " + replica.getBackendId() + "'s last failed version is " + replica.getLastFailedVersion()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org