This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 69b523ddffb branch-4.1: [fix](fe) avoid concurrent tablet stat
iteration failures #63298 (#63560)
69b523ddffb is described below
commit 69b523ddffb2e4dd6b8066aebc1e50e7774046e9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 25 13:21:04 2026 +0800
branch-4.1: [fix](fe) avoid concurrent tablet stat iteration failures
#63298 (#63560)
Cherry-picked from #63298
Co-authored-by: yaoxiao <[email protected]>
Co-authored-by: morningman <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../doris/alter/MaterializedViewHandler.java | 17 +++-
.../apache/doris/alter/SchemaChangeHandler.java | 19 ++++-
.../java/org/apache/doris/backup/RestoreJob.java | 9 +-
.../apache/doris/catalog/CloudTabletStatMgr.java | 1 +
.../java/org/apache/doris/catalog/LocalTablet.java | 96 +++++++++++++---------
.../apache/doris/catalog/MaterializedIndex.java | 89 ++++++++++++++++----
.../java/org/apache/doris/catalog/OlapTable.java | 7 +-
.../org/apache/doris/catalog/TabletStatMgr.java | 18 +++-
.../apache/doris/cloud/backup/CloudRestoreJob.java | 8 +-
.../cloud/datasource/CloudInternalCatalog.java | 12 ++-
.../apache/doris/datasource/InternalCatalog.java | 14 +++-
.../doris/catalog/MaterializedIndexTest.java | 65 +++++++++++++++
.../java/org/apache/doris/catalog/TabletTest.java | 79 ++++++++++++++++++
13 files changed, 360 insertions(+), 74 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 37ec3255f1b..5e3e64bf8c8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -437,14 +437,22 @@ public class MaterializedViewHandler extends AlterHandler
{
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId,
IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
short replicationNum =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
+ // All MV tablets of the same (partition, mv index) share the same
TabletMeta;
+ // build it once and bulk-publish to MaterializedIndex.tablets
after the per-tablet
+ // loop to keep copy-on-write O(n). TabletInvertedIndex
registration stays
+ // per-iteration because Tablet.addReplica(...) below needs the
tablet present
+ // in the inverted index.
+ TabletMeta mvTabletMeta = new TabletMeta(
+ dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium);
+ List<Tablet> mvTabletsForPartition =
Lists.newArrayListWithCapacity(baseIndex.getTablets().size());
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Tablet baseTablet : baseIndex.getTablets()) {
- TabletMeta mvTabletMeta = new TabletMeta(
- dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium);
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(mvTabletId);
- mvIndex.addTablet(newTablet, mvTabletMeta);
+ invertedIndex.addTablet(mvTabletId, mvTabletMeta);
+ mvTabletsForPartition.add(newTablet);
addedTablets.add(newTablet);
mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
@@ -499,6 +507,9 @@ public class MaterializedViewHandler extends AlterHandler {
}
} // end for baseTablets
+ // Bulk-publish all MV tablets for this partition in one
copy-on-write.
+ mvIndex.appendTablets(mvTabletsForPartition);
+
mvJob.addMVIndex(partitionId, mvIndex);
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 42e25ea8f06..e791c65088c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -66,6 +66,7 @@ import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.AnalysisException;
@@ -1680,14 +1681,23 @@ public class SchemaChangeHandler extends AlterHandler {
MaterializedIndex originIndex =
partition.getIndex(originIndexId);
ReplicaAllocation replicaAlloc =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+ // All shadow tablets of the same (partition, shadow index)
share the same TabletMeta;
+ // build it once and bulk-publish to MaterializedIndex.tablets
after the per-tablet
+ // loop to keep copy-on-write O(n). TabletInvertedIndex
registration stays
+ // per-iteration because Tablet.addReplica(...) below needs
the tablet present
+ // in the inverted index.
+ TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId,
partitionId, shadowIndexId,
+ newSchemaHash, medium);
+ List<Tablet> shadowTabletsForPartition =
Lists.newArrayListWithCapacity(
+ originIndex.getTablets().size());
+ TabletInvertedIndex invertedIndex =
Env.getCurrentInvertedIndex();
for (Tablet originTablet : originIndex.getTablets()) {
- TabletMeta shadowTabletMeta = new TabletMeta(dbId,
tableId, partitionId, shadowIndexId,
- newSchemaHash, medium);
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();
Tablet shadowTablet =
EnvFactory.getInstance().createTablet(shadowTabletId);
- shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
+ invertedIndex.addTablet(shadowTabletId, shadowTabletMeta);
+ shadowTabletsForPartition.add(shadowTablet);
addedTablets.add(shadowTablet);
schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId,
shadowTabletId, originTabletId);
@@ -1745,6 +1755,9 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
+ // Bulk-publish all shadow tablets for this partition in one
copy-on-write.
+ shadowIndex.appendTablets(shadowTabletsForPartition);
+
schemaChangeJob.addPartitionShadowIndex(partitionId,
shadowIndexId, shadowIndex);
} // end for partition
schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId,
newIndexName, newSchemaVersion, newSchemaHash,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index e7bb52f98e3..4e007723f49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -107,6 +107,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -1525,12 +1526,13 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
int schemaHash =
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
remoteIdx.clearTabletsForRestore();
+ // Collect locally and bulk-publish to keep copy-on-write O(n) for
the whole index.
+ List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
for (int i = 0; i < remotetabletSize; i++) {
// generate new tablet id
long newTabletId = env.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(newTabletId);
- // add tablet to index, but not add to TabletInvertedIndex
- remoteIdx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
+ newTablets.add(newTablet);
// replicas
try {
Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium
= Env.getCurrentSystemInfo()
@@ -1549,6 +1551,9 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
return null;
}
}
+ // add tablets to index in one batch; TabletInvertedIndex
registration
+ // is intentionally skipped on the restore path (rebuilt
separately).
+ remoteIdx.appendTablets(newTablets);
}
return remotePart;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index b28cc369d46..70ab983c231 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -370,6 +370,7 @@ public class CloudTabletStatMgr extends MasterDaemon {
long tabletIndexSize = 0L;
long tabletSegmentSize = 0L;
+ // getReplicas() returns an immutable volatile
snapshot; CME-safe under concurrent DDL.
for (Replica replica : tablet.getReplicas()) {
if (replica.getDataSize() >
tabletDataSize) {
tabletDataSize = replica.getDataSize();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
index 14d9171f3ff..5db0a1286b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java
@@ -31,8 +31,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import java.util.stream.LongStream;
@@ -40,7 +40,7 @@ public class LocalTablet extends Tablet {
private static final Logger LOG = LogManager.getLogger(LocalTablet.class);
@SerializedName(value = "rs", alternate = {"replicas"})
- private List<Replica> replicas;
+ private volatile List<Replica> replicas;
@SerializedName(value = "lastCheckTime")
private long lastCheckTime;
@@ -109,13 +109,14 @@ public class LocalTablet extends Tablet {
if (cooldownReplicaId <= 0) {
return 0;
}
- for (Replica r : replicas) {
+ List<Replica> snapshot = replicas; // single volatile read; reuse below
+ for (Replica r : snapshot) {
if (r.getId() == cooldownReplicaId) {
return r.getRemoteDataSize();
}
}
// return replica with max remoteDataSize
- return
replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
+ return
snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
}
@Override
@@ -223,29 +224,32 @@ public class LocalTablet extends Tablet {
this.lastCheckTime = lastCheckTime;
}
- private boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
- boolean delete = false;
+ // Writers are synchronized on this tablet to prevent concurrent
lost-update:
+ // some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT
hold
+ // the OlapTable write lock when modifying replicas.
+ // Readers capture the volatile reference once and iterate freely — no
lock needed.
+
+ @Override
+ public synchronized void addReplica(Replica replica, boolean isRestore) {
+ long version = replica.getVersion();
+ long backendId = replica.getBackendIdWithoutException();
boolean hasBackend = false;
- long version = newReplica.getVersion();
- Iterator<Replica> iterator = replicas.iterator();
- while (iterator.hasNext()) {
- Replica replica = iterator.next();
- if (replica.getBackendIdWithoutException() ==
newReplica.getBackendIdWithoutException()) {
+ boolean deletedOld = false;
+ List<Replica> current = replicas;
+ List<Replica> next = new ArrayList<>(current.size() + 1);
+ for (Replica r : current) {
+ if (r.getBackendIdWithoutException() == backendId) {
hasBackend = true;
- if (replica.getVersion() <= version) {
- iterator.remove();
- delete = true;
+ if (r.getVersion() <= version) {
+ deletedOld = true;
+ continue; // drop stale replica
}
}
+ next.add(r);
}
-
- return delete || !hasBackend;
- }
-
- @Override
- public void addReplica(Replica replica, boolean isRestore) {
- if (isLatestReplicaAndDeleteOld(replica)) {
- replicas.add(replica);
+ if (deletedOld || !hasBackend) {
+ next.add(replica);
+ replicas = next; // volatile write; readers see the new immutable
snapshot
if (!isRestore) {
Env.getCurrentInvertedIndex().addReplica(id, replica);
}
@@ -254,12 +258,13 @@ public class LocalTablet extends Tablet {
@Override
public List<Replica> getReplicas() {
- return this.replicas;
+ // Volatile read: returns the current immutable snapshot; callers
iterate without locking.
+ return Collections.unmodifiableList(replicas);
}
@Override
public Replica getReplicaByBackendId(long backendId) {
- for (Replica replica : replicas) {
+ for (Replica replica : replicas) { // single volatile read
if (replica.getBackendIdWithoutException() == backendId) {
return replica;
}
@@ -268,9 +273,12 @@ public class LocalTablet extends Tablet {
}
@Override
- public boolean deleteReplica(Replica replica) {
- if (replicas.contains(replica)) {
- replicas.remove(replica);
+ public synchronized boolean deleteReplica(Replica replica) {
+ List<Replica> current = replicas;
+ if (current.contains(replica)) {
+ List<Replica> next = new ArrayList<>(current);
+ next.remove(replica);
+ replicas = next; // volatile write
Env.getCurrentInvertedIndex().deleteReplica(id,
replica.getBackendIdWithoutException());
return true;
}
@@ -278,16 +286,22 @@ public class LocalTablet extends Tablet {
}
@Override
- public boolean deleteReplicaByBackendId(long backendId) {
- Iterator<Replica> iterator = replicas.iterator();
- while (iterator.hasNext()) {
- Replica replica = iterator.next();
+ public synchronized boolean deleteReplicaByBackendId(long backendId) {
+ List<Replica> current = replicas;
+ List<Replica> next = new ArrayList<>(current.size());
+ Replica found = null;
+ for (Replica replica : current) {
if (replica.getBackendIdWithoutException() == backendId) {
- iterator.remove();
- Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
- return true;
+ found = replica;
+ } else {
+ next.add(replica);
}
}
+ if (found != null) {
+ replicas = next; // volatile write
+ Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
+ return true;
+ }
return false;
}
@@ -302,13 +316,17 @@ public class LocalTablet extends Tablet {
LocalTablet tablet = (LocalTablet) obj;
- if (replicas != tablet.replicas) {
- if (replicas.size() != tablet.replicas.size()) {
+ // Capture one snapshot per side so a concurrent writer cannot publish
+ // a different list between size/contains/get calls below.
+ List<Replica> thisReplicas = replicas;
+ List<Replica> otherReplicas = tablet.replicas;
+ if (thisReplicas != otherReplicas) {
+ if (thisReplicas.size() != otherReplicas.size()) {
return false;
}
- int size = replicas.size();
+ int size = thisReplicas.size();
for (int i = 0; i < size; i++) {
- if (!tablet.replicas.contains(replicas.get(i))) {
+ if (!otherReplicas.contains(thisReplicas.get(i))) {
return false;
}
}
@@ -334,7 +352,7 @@ public class LocalTablet extends Tablet {
}
boolean allBeAliveOrDecommissioned = true;
- for (Replica replica : replicas) {
+ for (Replica replica : replicas) { // single volatile read; iteration
on the snapshot
Backend backend =
infoService.getBackend(replica.getBackendIdWithoutException());
if (backend == null || (!backend.isAlive() &&
!backend.isDecommissioned())) {
allBeAliveOrDecommissioned = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
index b1c84361313..81d9e1f7dd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
@@ -19,10 +19,11 @@ package org.apache.doris.catalog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,10 +58,16 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
@SerializedName(value = "rowCount")
private long rowCount;
- private Map<Long, Tablet> idToTablets;
+ // Published as a volatile immutable snapshot in lockstep with `tablets`.
+ // Writers (synchronized) build a fresh HashMap and assign the field;
readers
+ // capture the reference once and call get/containsKey on the snapshot.
+ // Invariant: `tablets ⊆ idToTablets` — any tablet visible in the list is
also
+ // present in the map. This is preserved by publishing the map BEFORE the
list
+ // on add and the list BEFORE the map on clear.
+ private volatile Map<Long, Tablet> idToTablets;
@SerializedName(value = "tablets")
// this is for keeping tablet order
- private List<Tablet> tablets;
+ private volatile List<Tablet> tablets;
// for push after rollup index finished
@SerializedName(value = "rollupIndexId")
@@ -94,38 +101,78 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
}
public List<Tablet> getTablets() {
- return tablets;
+ // Volatile read: returns the current immutable snapshot; callers
iterate without locking.
+ return Collections.unmodifiableList(tablets);
}
public List<Long> getTabletIdsInOrder() {
- List<Long> tabletIds = Lists.newArrayListWithCapacity(tablets.size());
- for (Tablet tablet : tablets) {
+ List<Tablet> snapshot = tablets; // single volatile read
+ List<Long> tabletIds = new ArrayList<>(snapshot.size());
+ for (Tablet tablet : snapshot) {
tabletIds.add(tablet.getId());
}
return tabletIds;
}
public Tablet getTablet(long tabletId) {
+ // Single volatile read of the immutable map snapshot.
return idToTablets.get(tabletId);
}
- public void clearTabletsForRestore() {
- idToTablets.clear();
- tablets.clear();
+ public synchronized void clearTabletsForRestore() {
+ // Drop the list first so iteration stops seeing tablets before
+ // lookup-by-id drops them. Maintains tablets ⊆ idToTablets.
+ tablets = new ArrayList<>();
+ idToTablets = new HashMap<>();
}
- public void addTablet(Tablet tablet, TabletMeta tabletMeta) {
+ public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) {
addTablet(tablet, tabletMeta, false);
}
- public void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean
isRestore) {
- idToTablets.put(tablet.getId(), tablet);
- tablets.add(tablet);
+ // Writers are synchronized on this index to prevent concurrent
lost-update:
+ // some callers (e.g. InternalCatalog.createTablets) do NOT hold the
OlapTable
+ // write lock when adding tablets.
+ // Copy-on-write keeps readers CME-safe without locking; for bulk creation
use
+ // appendTablets(...) so the per-index tablets list is copied once per
batch
+ // instead of once per tablet.
+ public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta,
boolean isRestore) {
+ appendTabletsInternal(Collections.singletonList(tablet));
if (!isRestore) {
Env.getCurrentInvertedIndex().addTablet(tablet.getId(),
tabletMeta);
}
}
+ // Bulk-publish: append the given tablets to this index's tablets list in a
+ // single copy-on-write (O(existing + batch) instead of O(n^2) over n
+ // single-tablet adds inside a synchronized block).
+ //
+ // Does NOT touch TabletInvertedIndex. Bulk-creation callers register
tablets
+ // in TabletInvertedIndex eagerly inside their per-tablet loop because
+ // Tablet.addReplica(...) (non-restore) requires the tablet to already be
+ // present in the inverted index; only the per-index list copy is expensive
+ // enough to be worth batching.
+ public synchronized void appendTablets(Collection<Tablet> newTablets) {
+ appendTabletsInternal(newTablets);
+ }
+
+ private void appendTabletsInternal(Collection<Tablet> newTablets) {
+ if (newTablets.isEmpty()) {
+ return;
+ }
+ Map<Long, Tablet> nextMap = new HashMap<>(idToTablets);
+ List<Tablet> nextList = new ArrayList<>(tablets.size() +
newTablets.size());
+ nextList.addAll(tablets);
+ for (Tablet tablet : newTablets) {
+ nextMap.put(tablet.getId(), tablet);
+ nextList.add(tablet);
+ }
+ // Publish the map first, then the list — so any id that appears in the
+ // visible `tablets` snapshot is already present in `idToTablets`.
+ idToTablets = nextMap;
+ tablets = nextList;
+ }
+
public void setIdForRestore(long idxId) {
this.id = idxId;
}
@@ -233,8 +280,9 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
}
public int getTabletOrderIdx(long tabletId) {
+ List<Tablet> snapshot = tablets; // single volatile read
int idx = 0;
- for (Tablet tablet : tablets) {
+ for (Tablet tablet : snapshot) {
if (tablet.getId() == tabletId) {
return idx;
}
@@ -271,15 +319,16 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
@Override
public String toString() {
+ List<Tablet> snapshot = tablets; // single volatile read
StringBuilder buffer = new StringBuilder();
buffer.append("index id: ").append(id).append("; ");
buffer.append("index state: ").append(state.name()).append("; ");
buffer.append("row count: ").append(rowCount).append("; ");
- buffer.append("tablets size: ").append(tablets.size()).append("; ");
+ buffer.append("tablets size: ").append(snapshot.size()).append("; ");
//
buffer.append("tablets: [");
- for (Tablet tablet : tablets) {
+ for (Tablet tablet : snapshot) {
buffer.append("tablet: ").append(tablet.toString()).append(", ");
}
buffer.append("]; ");
@@ -292,9 +341,13 @@ public class MaterializedIndex extends MetaObject
implements GsonPostProcessable
@Override
public void gsonPostProcess() {
- // build "idToTablets" from "tablets"
+ // Build a fresh "idToTablets" snapshot from the deserialized
"tablets" list.
+ // Runs single-threaded during gson deserialization, before any
concurrent
+ // reader can observe this object.
+ Map<Long, Tablet> map = new HashMap<>(tablets.size());
for (Tablet tablet : tablets) {
- idToTablets.put(tablet.getId(), tablet);
+ map.put(tablet.getId(), tablet);
}
+ idToTablets = map;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 5555339ed63..b506b14179b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -945,10 +945,12 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
// generate new tablets in origin tablet order
int tabletNum = idx.getTablets().size();
idx.clearTabletsForRestore();
+ // Collect locally and bulk-publish to keep copy-on-write O(n)
for the whole index.
+ List<Tablet> newTablets = new ArrayList<>(tabletNum);
for (int i = 0; i < tabletNum; i++) {
long newTabletId = env.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(newTabletId);
- idx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
+ newTablets.add(newTablet);
// replicas
if (Config.isCloudMode()) {
long newReplicaId = Env.getCurrentEnv().getNextId();
@@ -988,6 +990,9 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
return new Status(ErrCode.COMMON_ERROR,
e.getMessage());
}
}
+ // add tablets to index in one batch; TabletInvertedIndex
registration
+ // is intentionally skipped on the restore path (rebuilt
separately).
+ idx.appendTablets(newTablets);
}
if (createNewColocateGroup) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index a493aecc4ac..7cdb4231504 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -369,7 +369,14 @@ public class TabletStatMgr extends MasterDaemon {
if (result.isSetTabletStatList()) {
for (TTabletStat stat : result.getTabletStatList()) {
if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) {
- Replica replica =
invertedIndex.getReplica(stat.getTabletId(), beId);
+ Replica replica;
+ try {
+ replica = invertedIndex.getReplica(stat.getTabletId(),
beId);
+ } catch (IllegalStateException e) {
+ LOG.debug("skip stale tablet stat update for tablet {}
on backend {}: {}",
+ stat.getTabletId(), beId, e.getMessage());
+ continue;
+ }
if (replica != null) {
replica.setDataSize(stat.getDataSize());
replica.setRemoteDataSize(stat.getRemoteDataSize());
@@ -393,7 +400,14 @@ public class TabletStatMgr extends MasterDaemon {
// the replica is obsolete, ignore it.
continue;
}
- Replica replica = invertedIndex.getReplica(entry.getKey(),
beId);
+ Replica replica;
+ try {
+ replica = invertedIndex.getReplica(entry.getKey(), beId);
+ } catch (IllegalStateException e) {
+ LOG.debug("skip stale tablet stat update for tablet {} on
backend {}: {}",
+ entry.getKey(), beId, e.getMessage());
+ continue;
+ }
if (replica == null) {
// replica may be deleted from catalog, ignore it.
continue;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
index b1c111ee4a1..4b724199a13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
@@ -333,18 +333,22 @@ public class CloudRestoreJob extends RestoreJob {
int schemaHash =
remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
remoteIdx.clearTabletsForRestore();
+ // Collect locally and bulk-publish to keep copy-on-write O(n) for
the whole index.
+ List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
for (int i = 0; i < remotetabletSize; i++) {
// generate new tablet id
long newTabletId = env.getNextId();
Tablet newTablet =
EnvFactory.getInstance().createTablet(newTabletId);
- // add tablet to index, but not add to TabletInvertedIndex
- remoteIdx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
+ newTablets.add(newTablet);
// replicas
long newReplicaId = Env.getCurrentEnv().getNextId();
Replica replica = new CloudReplica(newReplicaId, null,
Replica.ReplicaState.NORMAL,
visibleVersion, schemaHash, dbId, localTbl.getId(),
partitionId, remoteIdx.getId(), i);
newTablet.addReplica(replica, true /* is restore */);
}
+ // add tablets to index in one batch; TabletInvertedIndex
registration
+ // is intentionally skipped on the restore path (rebuilt
separately).
+ remoteIdx.appendTablets(newTablets);
}
return remotePart;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 0c7891c50b2..0b2bd821459 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -39,6 +39,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.CloudPartition;
@@ -80,6 +81,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -432,11 +434,16 @@ public class CloudInternalCatalog extends InternalCatalog
{
private void createCloudTablets(MaterializedIndex index, ReplicaState
replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation
replicaAlloc,
TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
+ // Collect bucket tablets locally and bulk-publish to the
MaterializedIndex's
+ // tablets list in a single copy-on-write after the loop (see
+ // InternalCatalog.createTablets for rationale).
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ List<Tablet> bucketTablets = new
ArrayList<>(distributionInfo.getBucketNum());
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
Tablet tablet =
EnvFactory.getInstance().createTablet(Env.getCurrentEnv().getNextId());
- // add tablet to inverted index first
- index.addTablet(tablet, tabletMeta);
+ invertedIndex.addTablet(tablet.getId(), tabletMeta);
+ bucketTablets.add(tablet);
tabletIdSet.add(tablet.getId());
long replicaId = Env.getCurrentEnv().getNextId();
@@ -445,6 +452,7 @@ public class CloudInternalCatalog extends InternalCatalog {
tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i);
tablet.addReplica(replica);
}
+ index.appendTablets(bucketTablets);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 96230b04665..b5ca1a571f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3494,12 +3494,19 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
+ // Collect bucket tablets locally and bulk-publish to the
MaterializedIndex's
+ // tablets list in a single copy-on-write after the loop (O(bucketNum)
instead
+ // of O(bucketNum^2)). TabletInvertedIndex registration stays
per-iteration
+ // because Tablet.addReplica(...) below needs the tablet present in the
+ // inverted index.
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ List<Tablet> bucketTablets = new
ArrayList<>(distributionInfo.getBucketNum());
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
// create a new tablet with random chosen backends
Tablet tablet =
EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId());
- // add tablet to inverted index first
- index.addTablet(tablet, tabletMeta);
+ invertedIndex.addTablet(tablet.getId(), tabletMeta);
+ bucketTablets.add(tablet);
tabletIdSet.add(tablet.getId());
// get BackendIds
@@ -3539,6 +3546,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
totalReplicaNum + " vs. " +
replicaAlloc.getTotalReplicaNum());
}
+ // Publish all bucket tablets to the materialized index in one batch.
+ index.appendTablets(bucketTablets);
+
if (groupId != null && chooseBackendsArbitrary) {
colocateIndex.addBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
ColocatePersistInfo info =
ColocatePersistInfo.createForBackendsPerBucketSeq(groupId,
backendsPerBucketSeq);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
index 25b94bc59e8..bf10b255676 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TStorageMedium;
import mockit.Mocked;
import org.junit.Assert;
@@ -34,6 +35,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class MaterializedIndexTest {
@@ -66,6 +69,68 @@ public class MaterializedIndexTest {
Assert.assertEquals(indexId, index.getId());
}
+ @Test
+ public void testGetTabletsReturnsImmutableSnapshot() {
+ TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD);
+ index.addTablet(new LocalTablet(1L), tabletMeta, true);
+
+ List<Tablet> snapshot = index.getTablets();
+ Assert.assertEquals(1, snapshot.size());
+
+ // A write after the snapshot was taken must not be visible in it
(copy-on-write).
+ index.addTablet(new LocalTablet(2L), tabletMeta, true);
+ Assert.assertEquals(1, snapshot.size());
+ Assert.assertEquals(2, index.getTablets().size());
+
+ // The returned snapshot is read-only.
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
snapshot.add(new LocalTablet(3L)));
+ }
+
+ @Test
+ public void testConcurrentGetTabletsNeverThrows() throws
InterruptedException {
+ // A reader repeatedly snapshots and iterates getTablets() while a
writer keeps
+ // adding tablets. Copy-on-write guarantees the reader never observes
a partially
+ // built list or throws ConcurrentModificationException.
+ TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ AtomicBoolean stop = new AtomicBoolean(false);
+
+ Thread writer = new Thread(() -> {
+ long id = 1000L;
+ while (!stop.get()) {
+ index.addTablet(new LocalTablet(id++), tabletMeta, true);
+ // Keep the list bounded (and exercise the clear path) so the
test stays fast.
+ if (index.getTablets().size() > 64) {
+ index.clearTabletsForRestore();
+ }
+ }
+ });
+
+ Thread reader = new Thread(() -> {
+ try {
+ for (int i = 0; i < 50000 && error.get() == null; i++) {
+ for (Tablet tablet : index.getTablets()) {
+ tablet.getId();
+ }
+ }
+ } catch (Throwable t) {
+ error.set(t);
+ } finally {
+ stop.set(true);
+ }
+ });
+
+ writer.start();
+ reader.start();
+ reader.join();
+ stop.set(true);
+ writer.join();
+
+ if (error.get() != null) {
+ Assert.fail("getTablets() iteration threw under concurrent
mutation: " + error.get());
+ }
+ }
+
@Test
public void testSerialization() throws Exception {
// 1. Write objects to file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index 27fd954e331..144e61e8236 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -37,6 +37,9 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
public class TabletTest {
@@ -124,6 +127,82 @@ public class TabletTest {
Assert.assertEquals(1, tablet.getReplicas().size());
}
+ @Test
+ public void testGetReplicasReturnsImmutableSnapshot() {
+ List<Replica> snapshot = tablet.getReplicas();
+ Assert.assertEquals(3, snapshot.size());
+
+ // A write after the snapshot was taken must not be visible in it
(copy-on-write).
+ Replica replica4 = new LocalReplica(4L, 4L, 100L, 0, 200000L, 0,
3000L, ReplicaState.NORMAL, 0, 0);
+ tablet.addReplica(replica4);
+ Assert.assertEquals(3, snapshot.size());
+ Assert.assertEquals(4, tablet.getReplicas().size());
+
+ // The returned snapshot is read-only.
+ Assert.assertThrows(UnsupportedOperationException.class, () ->
snapshot.add(replica4));
+ }
+
+ @Test
+ public void testIterateReplicasWhileMutatingDoesNotThrow() {
+ // Iterating the snapshot returned by getReplicas() must not throw
+ // ConcurrentModificationException even when the tablet is
structurally modified
+ // during iteration.
+ int seen = 0;
+ for (Replica r : tablet.getReplicas()) {
+ Assert.assertNotNull(r);
+ tablet.addReplica(new LocalReplica(100L + seen, 100L + seen, 100L,
0, 200000L, 0, 3000L,
+ ReplicaState.NORMAL, 0, 0));
+ tablet.deleteReplicaByBackendId(2L);
+ seen++;
+ }
+ Assert.assertEquals(3, seen);
+ }
+
+ @Test
+ public void testConcurrentGetReplicasNeverThrows() throws
InterruptedException {
+ // A reader repeatedly snapshots and iterates getReplicas() while a
writer keeps
+ // mutating the replica list. Copy-on-write guarantees the reader
never observes a
+ // partially built list or throws ConcurrentModificationException.
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ AtomicBoolean stop = new AtomicBoolean(false);
+
+ Thread writer = new Thread(() -> {
+ long id = 1000L;
+ while (!stop.get()) {
+ // Reuse a small set of backend ids so the list stays bounded
while still
+ // exercising the add/replace path.
+ long beId = id % 8;
+ tablet.addReplica(new LocalReplica(id, beId, 100L, 0, 200000L,
0, 3000L,
+ ReplicaState.NORMAL, 0, 0), true);
+ id++;
+ }
+ });
+
+ Thread reader = new Thread(() -> {
+ try {
+ for (int i = 0; i < 50000 && error.get() == null; i++) {
+ for (Replica r : tablet.getReplicas()) {
+ r.getId();
+ }
+ }
+ } catch (Throwable t) {
+ error.set(t);
+ } finally {
+ stop.set(true);
+ }
+ });
+
+ writer.start();
+ reader.start();
+ reader.join();
+ stop.set(true);
+ writer.join();
+
+ if (error.get() != null) {
+ Assert.fail("getReplicas() iteration threw under concurrent
mutation: " + error.get());
+ }
+ }
+
@Test
public void testSerialization() throws Exception {
final Path path = Files.createTempFile("olapTabletTest", "tmp");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]