This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f58d62ecd8f branch-4.0: [fix](cloud) fix table and partition 
get_version #60064 (#60202)
f58d62ecd8f is described below

commit f58d62ecd8f091543afa91f37cde899951839069
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 26 09:52:11 2026 +0800

    branch-4.0: [fix](cloud) fix table and partition get_version #60064 (#60202)
    
    Cherry-picked from #60064
    
    Co-authored-by: meiyi <[email protected]>
---
 .../java/org/apache/doris/catalog/OlapTable.java   | 36 +++++++++++++---------
 .../apache/doris/cloud/catalog/CloudPartition.java | 32 ++++++++++++-------
 .../java/org/apache/doris/qe/SessionVariable.java  |  2 +-
 .../doris/cloud/catalog/CloudPartitionTest.java    | 23 ++++++++++----
 4 files changed, 61 insertions(+), 32 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 1da1ab08b8d..1e69e9004fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -62,6 +62,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.proto.OlapFile.EncryptionAlgorithmPB;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.rpc.RpcException;
@@ -236,8 +237,8 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
 
     // Cache for table version in cloud mode
     // This value is set when get the table version from meta-service, 0 means 
version is not cached yet
-    private long lastTableVersionCachedTimeMs = 0;
-    private long cachedTableVersion = -1;
+    private volatile long lastTableVersionCachedTimeMs = 0;
+    private volatile long cachedTableVersion = -1;
 
     public OlapTable() {
         // for persist
@@ -3287,24 +3288,24 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         }
     }
 
-    public boolean isCachedTableVersionExpired() {
+    @VisibleForTesting
+    protected boolean isCachedTableVersionExpired() {
         // -1 means no cache yet, need to fetch from MS
         if (cachedTableVersion == -1) {
             return true;
         }
         ConnectContext ctx = ConnectContext.get();
-        if (ctx == null) {
-            return true;
-        }
-        long cacheExpirationMs = 
ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
+        long cacheExpirationMs = ctx == null ? 
VariableMgr.getDefaultSessionVariable().cloudTableVersionCacheTtlMs
+                : ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
         if (cacheExpirationMs <= 0) { // always expired
             return true;
         }
         return System.currentTimeMillis() - lastTableVersionCachedTimeMs > 
cacheExpirationMs;
     }
 
-    public void setCachedTableVersion(long version) {
-        if (version > cachedTableVersion) {
+    @VisibleForTesting
+    protected void setCachedTableVersion(long version) {
+        if (version >= cachedTableVersion) {
             cachedTableVersion = version;
             lastTableVersionCachedTimeMs = System.currentTimeMillis();
         }
@@ -3358,9 +3359,9 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
     }
 
     // Get the table versions in batch.
-    public static List<Long> getVisibleVersionInBatch(Collection<OlapTable> 
tables) {
+    public static List<Long> getVisibleVersionInBatch(List<OlapTable> tables) {
         if (tables.isEmpty()) {
-            return new ArrayList<>();
+            return Collections.emptyList();
         }
 
         if (Config.isNotCloudMode()) {
@@ -3369,14 +3370,21 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
                     .collect(Collectors.toList());
         }
 
-        List<Long> dbIds = new ArrayList<>();
-        List<Long> tableIds = new ArrayList<>();
+        List<Long> dbIds = new ArrayList<>(tables.size());
+        List<Long> tableIds = new ArrayList<>(tables.size());
         for (OlapTable table : tables) {
             dbIds.add(table.getDatabase().getId());
             tableIds.add(table.getId());
         }
 
-        return getVisibleVersionFromMeta(dbIds, tableIds);
+        List<Long> versions = getVisibleVersionFromMeta(dbIds, tableIds);
+
+        // update cache
+        Preconditions.checkState(tables.size() == versions.size());
+        for (int i = 0; i < tables.size(); i++) {
+            tables.get(i).setCachedTableVersion(versions.get(i));
+        }
+        return versions;
     }
 
     private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds, 
List<Long> tableIds) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index 16c4a9f8bce..788b4c6dbfa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -29,16 +29,18 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -56,7 +58,7 @@ public class CloudPartition extends Partition {
     private long tableId;
 
     // This value is set when get the version from meta-service, 0 means 
version is not cached yet
-    private long lastVersionCachedTimeMs = 0;
+    private volatile long lastVersionCachedTimeMs = 0;
 
     private ReentrantLock lock = new ReentrantLock(true);
 
@@ -96,7 +98,7 @@ public class CloudPartition extends Partition {
         return;
     }
 
-    public void setCachedVisibleVersion(long version, Long 
versionUpdateTimeMs) {
+    public void setCachedVisibleVersion(long version, long 
versionUpdateTimeMs) {
         // we only care the version should increase monotonically and ignore 
the readers
         LOG.debug("setCachedVisibleVersion use CloudPartition {}, version: {}, 
old version: {}",
                 super.getId(), version, super.getVisibleVersion());
@@ -115,8 +117,14 @@ public class CloudPartition extends Partition {
         return super.getVisibleVersion();
     }
 
-    public boolean isCachedVersionExpired() {
-        long cacheExpirationMs = 
SessionVariable.cloudPartitionVersionCacheTtlMs;
+    @VisibleForTesting
+    protected boolean isCachedVersionExpired() {
+        if (lastVersionCachedTimeMs == 0) {
+            return true;
+        }
+        ConnectContext ctx = ConnectContext.get();
+        long cacheExpirationMs = ctx == null ? 
VariableMgr.getDefaultSessionVariable().cloudPartitionVersionCacheTtlMs
+                : ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs;
         if (cacheExpirationMs <= 0) { // always expired
             return true;
         }
@@ -148,6 +156,7 @@ public class CloudPartition extends Partition {
                 .setTableId(this.tableId)
                 .setPartitionId(super.getId())
                 .setBatchMode(false)
+                .setWaitForPendingTxn(waitForPendingTxns)
                 .build();
 
         try {
@@ -251,16 +260,18 @@ public class CloudPartition extends Partition {
     // Return the visible version in order of the specified partition ids, -1 
means version NOT FOUND.
     public static List<Long> getSnapshotVisibleVersion(List<CloudPartition> 
partitions) throws RpcException {
         if (partitions.isEmpty()) {
-            return new ArrayList<>();
+            return Collections.emptyList();
         }
 
-        if (SessionVariable.cloudPartitionVersionCacheTtlMs <= 0) { // No 
cached versions will be used
+        long cloudPartitionVersionCacheTtlMs = ConnectContext.get() == null ? 0
+                : 
ConnectContext.get().getSessionVariable().cloudPartitionVersionCacheTtlMs;
+        if (cloudPartitionVersionCacheTtlMs <= 0) { // No cached versions will 
be used
             return getSnapshotVisibleVersionFromMs(partitions, false);
         }
 
         // partitionId -> cachedVersion
-        List<Pair<Long, Long>> allVersions = new ArrayList<>();
-        List<CloudPartition> expiredPartitions = new ArrayList<>();
+        List<Pair<Long, Long>> allVersions = new 
ArrayList<>(partitions.size());
+        List<CloudPartition> expiredPartitions = new 
ArrayList<>(partitions.size());
         for (CloudPartition partition : partitions) {
             long ver = partition.getCachedVisibleVersion();
             if (partition.isCachedVersionExpired()) {
@@ -272,8 +283,7 @@ public class CloudPartition extends Partition {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("cloudPartitionVersionCacheTtlMs={}, numPartitions={}, 
numFilteredPartitions={}",
-                    SessionVariable.cloudPartitionVersionCacheTtlMs,
-                    partitions.size(), partitions.size() - 
expiredPartitions.size());
+                    cloudPartitionVersionCacheTtlMs, partitions.size(), 
partitions.size() - expiredPartitions.size());
         }
 
         List<Long> versions = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7f875cd5e3d..e9378840101 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2807,7 +2807,7 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
     public boolean disableEmptyPartitionPrune = false;
     @VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
-    public static long cloudPartitionVersionCacheTtlMs = 0;
+    public long cloudPartitionVersionCacheTtlMs = 0;
     @VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
     public long cloudTableVersionCacheTtlMs = 0;
     // CLOUD_VARIABLES_END
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
index 3618b581c77..8213cd6c307 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.catalog;
 
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.cloud.rpc.VersionHelper;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rpc.RpcException;
 
@@ -44,13 +45,18 @@ public class CloudPartitionTest {
 
     @Test
     public void testIsCachedVersionExpired() {
+        // Create ConnectContext with SessionVariable
+        ConnectContext ctx = new ConnectContext();
+        ctx.setSessionVariable(new SessionVariable());
+        ctx.setThreadLocalInfo();
+
         // test isCachedVersionExpired
         CloudPartition part = createPartition(1, 2, 3);
-        SessionVariable.cloudPartitionVersionCacheTtlMs = 0;
+        ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0;
         Assertions.assertTrue(part.isCachedVersionExpired());
-        SessionVariable.cloudPartitionVersionCacheTtlMs = -10086;
+        ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -10086;
         part.setCachedVisibleVersion(2, 10086L); // update version and last 
cache time
-        SessionVariable.cloudPartitionVersionCacheTtlMs = 10000;
+        ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 10000;
         Assertions.assertFalse(part.isCachedVersionExpired()); // not expired 
due to long expiration duration
         Assertions.assertEquals(2, part.getCachedVisibleVersion());
 
@@ -58,6 +64,11 @@ public class CloudPartitionTest {
 
     @Test
     public void testCachedVersion() throws RpcException {
+        // Create ConnectContext with SessionVariable
+        ConnectContext ctx = new ConnectContext();
+        ctx.setSessionVariable(new SessionVariable());
+        ctx.setThreadLocalInfo();
+
         CloudPartition part = createPartition(1, 2, 3);
         List<CloudPartition> parts = new ArrayList<>();
         for (long i = 0; i < 3; ++i) {
@@ -87,7 +98,7 @@ public class CloudPartitionTest {
         };
         // CHECKSTYLE ON
 
-        SessionVariable.cloudPartitionVersionCacheTtlMs = -1; // disable cache
+        ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -1; // 
disable cache
             {
                 // test single get version
                 Assertions.assertEquals(2, part.getVisibleVersion()); // 
should not get from cache
@@ -106,7 +117,7 @@ public class CloudPartitionTest {
             }
 
         // enable change expiration and make it cached in long duration
-        SessionVariable.cloudPartitionVersionCacheTtlMs = 100000;
+        ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 100000;
             {
                 // test single get version
                 Assertions.assertEquals(2, part.getVisibleVersion()); // 
cached version
@@ -125,7 +136,7 @@ public class CloudPartitionTest {
             }
 
         // enable change expiration and make it expired
-        SessionVariable.cloudPartitionVersionCacheTtlMs = 500;
+        ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 500;
         try {
             Thread.sleep(550);
         } catch (InterruptedException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to