This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4a1d3df190d [Fix](cloud) Fix getVisibleVersion returning RuntimeException (#51044) 4a1d3df190d is described below commit 4a1d3df190d68fa0af31c4a63ddd1df22827eb2c Author: deardeng <deng...@selectdb.com> AuthorDate: Thu Jun 12 21:00:57 2025 +0800 [Fix](cloud) Fix getVisibleVersion returning RuntimeException (#51044) On the cloud, getVisibleVersion returns an exception, letting the caller decide how to handle the exception. --- .../java/org/apache/doris/catalog/OlapTable.java | 83 ++++++++++------------ .../SupportBinarySearchFilteringPartitions.java | 3 +- .../cache/NereidsSortedPartitionsCacheManager.java | 27 +++++-- .../doris/common/cache/NereidsSqlCacheManager.java | 19 ++++- .../apache/doris/common/proc/TablesProcDir.java | 13 +++- .../apache/doris/datasource/InternalCatalog.java | 4 +- .../org/apache/doris/nereids/SqlCacheContext.java | 18 ++++- .../trees/plans/logical/LogicalOlapScan.java | 11 ++- .../trees/plans/physical/PhysicalOlapScan.java | 15 +++- .../org/apache/doris/qe/cache/CacheAnalyzer.java | 12 ++-- .../apache/doris/statistics/AnalysisManager.java | 21 +++++- .../doris/statistics/StatisticsAutoCollector.java | 11 ++- .../doris/statistics/util/StatisticsUtil.java | 9 ++- .../transaction/GlobalTransactionMgrTest.java | 11 ++- 14 files changed, 185 insertions(+), 72 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 7dcb23dd03a..03c5cca5188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -140,7 +140,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc } @Override - public Object getPartitionMetaVersion(CatalogRelation scan) { + public Object getPartitionMetaVersion(CatalogRelation scan) throws RpcException { return getVisibleVersion(); } @@ -1420,19 +1420,16 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc return partition; } - public void getVersionInBatchForCloudMode(Collection<Long> partitionIds) { - if (Config.isCloudMode()) { // do nothing for non-cloud mode - List<CloudPartition> partitions = partitionIds.stream() - .sorted() - .map(this::getPartition) - .map(partition -> (CloudPartition) partition) - .collect(Collectors.toList()); - try { - CloudPartition.getSnapshotVisibleVersion(partitions); - } catch (RpcException e) { - throw new RuntimeException(e); - } + public void getVersionInBatchForCloudMode(Collection<Long> partitionIds) throws RpcException { + if (Config.isNotCloudMode()) { + return; } + List<CloudPartition> partitions = partitionIds.stream() + .sorted() + .map(this::getPartition) + .map(partition -> (CloudPartition) partition) + .collect(Collectors.toList()); + CloudPartition.getSnapshotVisibleVersion(partitions); } // select the non-empty partition ids belonging to this table. @@ -3320,18 +3317,22 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc // During `getNextVersion` and `updateVisibleVersionAndTime` period, // the write lock on the table should be held continuously public long getNextVersion() { - if (!Config.isCloudMode()) { + if (Config.isNotCloudMode()) { return tableAttributes.getNextVersion(); - } else { - // cloud mode should not reach here - if (LOG.isDebugEnabled()) { - LOG.debug("getNextVersion in Cloud mode in OlapTable {} ", getName()); - } + } + // cloud mode should not reach here + if (LOG.isDebugEnabled()) { + LOG.debug("getNextVersion in Cloud mode in OlapTable {} ", getName()); + } + try { return getVisibleVersion() + 1; + } catch (RpcException e) { + LOG.warn("getNextVersion in Cloud mode in OlapTable {}", getName(), e); + throw new RuntimeException(e); } } - public long getVisibleVersion() { + public long getVisibleVersion() throws RpcException { if (Config.isNotCloudMode()) { return tableAttributes.getVisibleVersion(); } @@ -3361,30 +3362,11 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc } return version; } catch (RpcException e) { - throw new RuntimeException("get version from meta service failed", e); + LOG.warn("get version from meta service failed", e); + throw e; } } - // Get the table versions in batch. - public static List<Long> getVisibleVersionByTableIds(Collection<Long> tableIds) { - List<OlapTable> tables = new ArrayList<>(); - - InternalCatalog catalog = Env.getCurrentEnv().getInternalCatalog(); - for (long tableId : tableIds) { - Table table = catalog.getTableByTableId(tableId); - if (table == null) { - throw new RuntimeException("get table visible version failed, no such table " + tableId + " exists"); - } - if (table.getType() != TableType.OLAP) { - throw new RuntimeException( - "get table visible version failed, table " + tableId + " is not a OLAP table"); - } - tables.add((OlapTable) table); - } - - return getVisibleVersionInBatch(tables); - } - // Get the table versions in batch. public static List<Long> getVisibleVersionInBatch(Collection<OlapTable> tables) { if (tables.isEmpty()) { @@ -3467,7 +3449,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc // return version rather than time because: // 1. they are all incremental // 2. more reasonable for UnassignedAllBEJob to compare version this time we plan and last succeed refresh. - return getVisibleVersion(); // for both cloud and non-cloud mode + try { + return getVisibleVersion(); // for both cloud and non-cloud mode + } catch (RpcException e) { + LOG.warn("get visible version, table {}", getName(), e); + return 0; + } } @Override @@ -3521,7 +3508,8 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions(); if (tableVersions.containsKey(id)) { // hits cache return new MTMVVersionSnapshot(tableVersions.get(id), id); @@ -3531,8 +3519,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc } @Override - public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) { - return new MTMVVersionSnapshot(getVisibleVersion(), id); + public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException { + try { + return new MTMVVersionSnapshot(getVisibleVersion(), id); + } catch (RpcException e) { + LOG.warn("getVisibleVersion failed", e); + throw new AnalysisException("getVisibleVersion failed " + e.getMessage()); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SupportBinarySearchFilteringPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SupportBinarySearchFilteringPartitions.java index 8ba63086a64..d9833d1d297 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SupportBinarySearchFilteringPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SupportBinarySearchFilteringPartitions.java @@ -18,6 +18,7 @@ package org.apache.doris.catalog; import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; +import org.apache.doris.rpc.RpcException; import java.util.Map; @@ -36,7 +37,7 @@ public interface SupportBinarySearchFilteringPartitions extends TableIf { * return the version of the partitions meta, if the version changed, we should skip the legacy sorted * partitions and reload it. */ - Object getPartitionMetaVersion(CatalogRelation scan); + Object getPartitionMetaVersion(CatalogRelation scan) throws RpcException; /** * when the partition meta loaded? if the partition meta load too frequently, we will skip sort partitions meta diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java index 6076a963884..1ef5a9bdcbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.Par import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange; import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.RpcException; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -38,6 +39,8 @@ import com.google.common.collect.Range; import lombok.AllArgsConstructor; import lombok.Data; import org.apache.hadoop.util.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.lang.reflect.Field; import java.time.Duration; @@ -55,6 +58,7 @@ import java.util.Optional; * and the QPS can be improved */ public class NereidsSortedPartitionsCacheManager { + private static final Logger LOG = LogManager.getLogger(NereidsSortedPartitionsCacheManager.class); private volatile Cache<TableIdentifier, PartitionCacheContext> partitionCaches; public NereidsSortedPartitionsCacheManager() { @@ -82,19 +86,28 @@ public class NereidsSortedPartitionsCacheManager { TableIdentifier key = new TableIdentifier( catalog.getName(), database.getFullName(), table.getName()); PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key); - if (partitionCacheContext == null) { - return Optional.ofNullable(loadCache(key, table, scan)); - } - if (table.getId() != partitionCacheContext.tableId - || !Objects.equals(table.getPartitionMetaVersion(scan), partitionCacheContext.partitionMetaVersion)) { + + try { + if (partitionCacheContext == null) { + return Optional.ofNullable(loadCache(key, table, scan)); + } + if (table.getId() != partitionCacheContext.tableId + || !Objects.equals(table.getPartitionMetaVersion(scan), + partitionCacheContext.partitionMetaVersion)) { + partitionCaches.invalidate(key); + return Optional.ofNullable(loadCache(key, table, scan)); + } + } catch (Throwable t) { + LOG.warn("Failed to load cache for table {}, key {}.", table.getName(), key, t); partitionCaches.invalidate(key); - return Optional.ofNullable(loadCache(key, table, scan)); + return Optional.empty(); } return Optional.of(partitionCacheContext.sortedPartitionRanges); } private SortedPartitionRanges<?> loadCache( - TableIdentifier key, SupportBinarySearchFilteringPartitions table, CatalogRelation scan) { + TableIdentifier key, SupportBinarySearchFilteringPartitions table, CatalogRelation scan) + throws RpcException { long now = System.currentTimeMillis(); long partitionMetaLoadTime = table.getPartitionMetaLoadTimeMillis(scan); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java index 5aecf5ff4bb..73d6b7ebb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java @@ -63,12 +63,15 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ResultSet; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.SqlCache; +import org.apache.doris.rpc.RpcException; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.lang.reflect.Field; import java.time.Duration; @@ -83,6 +86,7 @@ import java.util.Set; * NereidsSqlCacheManager */ public class NereidsSqlCacheManager { + private static final Logger LOG = LogManager.getLogger(NereidsSqlCacheManager.class); // key: <ctl.db>:<user>:<sql> // value: SqlCacheContext private volatile Cache<String, SqlCacheContext> sqlCaches; @@ -352,7 +356,13 @@ public class NereidsSqlCacheManager { } OlapTable olapTable = (OlapTable) tableIf; - long currentTableVersion = olapTable.getVisibleVersion(); + long currentTableVersion = 0L; + try { + currentTableVersion = olapTable.getVisibleVersion(); + } catch (RpcException e) { + LOG.warn("table {}, in cloud getVisibleVersion exception", olapTable.getName(), e); + return true; + } long cacheTableVersion = tableVersion.version; // some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition? if (currentTableVersion != cacheTableVersion) { @@ -369,7 +379,12 @@ public class NereidsSqlCacheManager { } OlapTable olapTable = (OlapTable) tableIf; Collection<Long> partitionIds = scanTable.getScanPartitions(); - olapTable.getVersionInBatchForCloudMode(partitionIds); + try { + olapTable.getVersionInBatchForCloudMode(partitionIds); + } catch (RpcException e) { + LOG.warn("failed to get version in batch for table {}", fullTableName, e); + return true; + } for (Long scanPartitionId : scanTable.getScanPartitions()) { Partition partition = olapTable.getPartition(scanPartitionId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java index f4e39080abe..f5021cc9535 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TablesProcDir.java @@ -28,11 +28,14 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.rpc.RpcException; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collections; @@ -45,6 +48,7 @@ import java.util.Map.Entry; * show table family groups' info within a db */ public class TablesProcDir implements ProcDirInterface { + private static final Logger LOG = LogManager.getLogger(ProcDirInterface.class); public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() .add("TableId").add("TableName").add("IndexNum").add("PartitionColumnName") .add("PartitionNum").add("State").add("Type").add("LastConsistencyCheckTime").add("ReplicaCount") @@ -119,6 +123,13 @@ public class TablesProcDir implements ProcDirInterface { ++idx; } } + long version = 0; + try { + version = ((OlapTable) table).getVisibleVersion(); + } catch (RpcException e) { + LOG.warn("table {}, in cloud getVisibleVersion exception", table.getName(), e); + throw new AnalysisException(e.getMessage()); + } replicaCount = olapTable.getReplicaCount(); tableInfo.add(table.getId()); tableInfo.add(table.getName()); @@ -130,7 +141,7 @@ public class TablesProcDir implements ProcDirInterface { // last check time tableInfo.add(TimeUtils.longToTimeString(olapTable.getLastCheckTime())); tableInfo.add(replicaCount); - tableInfo.add(olapTable.getVisibleVersion()); + tableInfo.add(version); tableInfo.add(olapTable.getVisibleVersionTime()); } else { tableInfo.add(table.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 367d10529e7..dcec500ef78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2017,7 +2017,9 @@ public class InternalCatalog implements CatalogIf<Database> { } } - long version = olapTable.getVisibleVersion(); + // In cloud mode, the internal partition deletion logic will update the table version, + // so here we only need to handle non-cloud mode. + long version = 0L; long versionTime = olapTable.getVisibleVersionTime(); // Only update table version if drop a non-empty partition if (partition != null && partition.hasData()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index 2278436888b..0794d0aca0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -35,6 +35,7 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ResultSet; import org.apache.doris.qe.cache.CacheProxy; +import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; @@ -43,6 +44,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; @@ -55,6 +58,7 @@ import java.util.Set; /** SqlCacheContext */ public class SqlCacheContext { + private static final Logger LOG = LogManager.getLogger(SqlCacheContext.class); private final UserIdentity userIdentity; private final TUniqueId queryId; // if contains udf/udaf/tableValuesFunction we can not process it and skip use sql cache @@ -140,11 +144,22 @@ public class SqlCacheContext { return; } + long version = 0; + try { + if (tableIf instanceof OlapTable) { + version = ((OlapTable) tableIf).getVisibleVersion(); + } + } catch (RpcException e) { + // in cloud, getVisibleVersion throw exception, disable sql cache temporary + setHasUnsupportedTables(true); + LOG.warn("table {}, in cloud getVisibleVersion exception", tableIf.getName(), e); + } + usedTables.put( new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()), new TableVersion( tableIf.getId(), - tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L, + version, tableIf.getType() ) ); @@ -460,7 +475,6 @@ public class SqlCacheContext { @lombok.AllArgsConstructor public static class ScanTable { public final FullTableName fullTableName; - public final long latestVersion; public final List<Long> scanPartitions = Lists.newArrayList(); public void addScanPartition(Long partitionId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index d28ba05220f..a9e2f7b3744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.algebra.OlapScan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.RpcException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -239,8 +240,16 @@ public class LogicalOlapScan extends LogicalCatalogRelation implements OlapScan } // NOTE: embed version info avoid mismatching under data maintaining // TODO: more efficient way to ignore the ignorable data maintaining + long version = 0; + try { + version = getTable().getVisibleVersion(); + } catch (RpcException e) { + String errMsg = "table " + getTable().getName() + "in cloud getTableVisibleVersion error"; + LOG.warn(errMsg, e); + throw new IllegalStateException(errMsg); + } return Utils.toSqlString("OlapScan[" + table.getNameWithFullQualifiers() + partitions + "]" - + "#" + getRelationId() + "@" + getTable().getVisibleVersion() + + "#" + getRelationId() + "@" + version + "@" + getTable().getVisibleVersionTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java index 3948ebdbae5..4f911bb35a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java @@ -32,9 +32,12 @@ import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; import org.apache.doris.nereids.trees.plans.algebra.OlapScan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.json.JSONObject; import java.util.Collection; @@ -46,7 +49,7 @@ import java.util.Optional; * Physical olap scan plan. */ public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapScan { - + private static final Logger LOG = LogManager.getLogger(PhysicalOlapScan.class); private final DistributionSpec distributionSpec; private final long selectedIndexId; private final ImmutableList<Long> selectedTabletIds; @@ -139,8 +142,16 @@ public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapSca } // NOTE: embed version info avoid mismatching under data maintaining // TODO: more efficient way to ignore the ignorable data maintaining + long version = 0; + try { + version = getTable().getVisibleVersion(); + } catch (RpcException e) { + String errMsg = "table " + getTable().getName() + "in cloud getTableVisibleVersion error"; + LOG.warn(errMsg, e); + throw new IllegalStateException(errMsg); + } return Utils.toSqlString("OlapScan[" + table.getNameWithFullQualifiers() + partitions + "]" - + "#" + getRelationId() + "@" + getTable().getVisibleVersion() + + "#" + getRelationId() + "@" + version + "@" + getTable().getVisibleVersionTime()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index 92aff2cc0ae..7bdffd70746 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -58,6 +58,7 @@ import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.RowBatch; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -701,12 +702,15 @@ public class CacheAnalyzer { DatabaseIf database = olapTable.getDatabase(); CatalogIf catalog = database.getCatalog(); ScanTable scanTable = new ScanTable( - new FullTableName(catalog.getName(), database.getFullName(), olapTable.getName()), - olapTable.getVisibleVersion()); + new FullTableName(catalog.getName(), database.getFullName(), olapTable.getName())); scanTables.add(scanTable); Collection<Long> partitionIds = node.getSelectedPartitionIds(); - olapTable.getVersionInBatchForCloudMode(partitionIds); + try { + olapTable.getVersionInBatchForCloudMode(partitionIds); + } catch (RpcException e) { + LOG.warn("Failed to get version in batch for cloud mode, partitions {}.", partitionIds, e); + } for (Long partitionId : node.getSelectedPartitionIds()) { Partition partition = olapTable.getPartition(partitionId); @@ -729,7 +733,7 @@ public class CacheAnalyzer { DatabaseIf database = tableIf.getDatabase(); CatalogIf catalog = database.getCatalog(); ScanTable scanTable = new ScanTable(new FullTableName( - catalog.getName(), database.getFullName(), tableIf.getName()), 0); + catalog.getName(), database.getFullName(), tableIf.getName())); scanTables.add(scanTable); return cacheTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index b0a91736761..55bdfa34d4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -71,6 +71,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -523,7 +524,15 @@ public class AnalysisManager implements Writable { infoBuilder.setRowCount(rowCount); TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId()); infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()); - infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0); + long version = 0; + try { + if (table instanceof OlapTable) { + version = ((OlapTable) table).getVisibleVersion(); + } + } catch (RpcException e) { + LOG.warn("table {}, in cloud getVisibleVersion exception", table.getName(), e); + } + infoBuilder.setTableVersion(version); infoBuilder.setPriority(JobPriority.MANUAL); infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows); infoBuilder.setEnablePartition(StatisticsUtil.enablePartitionAnalyze()); @@ -602,7 +611,15 @@ public class AnalysisManager implements Writable { infoBuilder.setRowCount(rowCount); TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId()); infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()); - infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0); + long version = 0; + try { + if (table instanceof OlapTable) { + version = ((OlapTable) table).getVisibleVersion(); + } + } catch (RpcException e) { + LOG.warn("table {}, in cloud getVisibleVersion exception", table.getName(), e); + } + infoBuilder.setTableVersion(version); infoBuilder.setPriority(JobPriority.MANUAL); infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows); infoBuilder.setEnablePartition(StatisticsUtil.enablePartitionAnalyze()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 8420e86fa8e..2f287cca035 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; @@ -226,6 +227,14 @@ public class StatisticsAutoCollector extends MasterDaemon { for (Pair<String, String> pair : jobColumns) { stringJoiner.add(pair.toString()); } + long version = 0; + try { + if (table instanceof OlapTable) { + version = ((OlapTable) table).getVisibleVersion(); + } + } catch (RpcException e) { + LOG.warn("table {}, in cloud getVisibleVersion exception", table.getName(), e); + } return new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) .setCatalogId(table.getDatabase().getCatalog().getId()) @@ -246,7 +255,7 @@ public class StatisticsAutoCollector extends MasterDaemon { .setTblUpdateTime(table.getUpdateTime()) .setRowCount(rowCount) .setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()) - .setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0) + .setTableVersion(version) .setPriority(priority) .setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows) .setEnablePartition(StatisticsUtil.enablePartitionAnalyze()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 52d548af0c8..f8d8daec8b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -73,6 +73,7 @@ import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.ColStatsMeta; @@ -1256,7 +1257,13 @@ public class StatisticsUtil { // For olap table, if the table visible version and row count doesn't change since last analyze, // we don't need to analyze it because its data is not changed. OlapTable olapTable = (OlapTable) table; - return olapTable.getVisibleVersion() != columnStats.tableVersion + long version = 0; + try { + version = ((OlapTable) table).getVisibleVersion(); + } catch (RpcException e) { + LOG.warn("in cloud getVisibleVersion exception", e); + } + return version != columnStats.tableVersion || olapTable.getRowCount() != columnStats.rowCount; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 522021a9771..10e446348af 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -46,6 +46,7 @@ import org.apache.doris.load.routineload.RoutineLoadStatistic; import org.apache.doris.load.routineload.RoutineLoadTaskInfo; import org.apache.doris.meta.MetaContext; import org.apache.doris.persist.EditLog; +import org.apache.doris.rpc.RpcException; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TKafkaRLTaskProgress; import org.apache.doris.thrift.TLoadSourceType; @@ -1175,9 +1176,15 @@ public class GlobalTransactionMgrTest { } private void checkTableVersion(OlapTable olapTable, long visibleVersion, long nextVersion) { - LOG.info("table={}, visibleVersion={}, nextVersion={}", olapTable.getName(), olapTable.getVisibleVersion(), + long version = 0; + try { + version = olapTable.getVisibleVersion(); + } catch (RpcException e) { + // ut do nothing + } + LOG.info("table={}, visibleVersion={}, nextVersion={}", olapTable.getName(), version, olapTable.getNextVersion()); - Assert.assertEquals(visibleVersion, olapTable.getVisibleVersion()); + Assert.assertEquals(visibleVersion, version); Assert.assertEquals(nextVersion, olapTable.getNextVersion()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org