github-actions[bot] commented on code in PR #64167:
URL: https://github.com/apache/doris/pull/64167#discussion_r3425248796
##########
fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java:
##########
@@ -782,6 +817,9 @@ private Tag chooseProperTag(TabletSchedCtx tabletCtx,
boolean forMissingReplica)
beId = -1;
}
Backend be = infoService.getBackend(beId);
+ if (tabletCtx.getExcludeTagSet().contains(be.getLocationTag())) {
Review Comment:
This now dereferences `be` before any null guard. A tablet can still have a
replica record for a backend that has already been dropped, in which case
`infoService.getBackend(beId)` returns null; previously the later condition
only dereferenced `be` after replica-state checks could short-circuit, but this
new exclusion check runs unconditionally even when `excludeTagSet` is empty.
That can abort repair/redundant scheduling with an NPE instead of letting the
scheduler skip the dropped backend. Please guard `be == null` before reading
its tag.
```suggestion
Backend be = infoService.getBackend(beId);
if (be == null) {
continue;
}
if (tabletCtx.getExcludeTagSet().contains(be.getLocationTag())) {
continue;
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/clone/TenantLevelColocateTableCheckerAndBalancer.java:
##########
@@ -0,0 +1,470 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletHealth;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TenantLevelColocateGroupSchema;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex;
+import org.apache.doris.catalog.TenantLevelColocateTableIndex.GroupV2Id;
+import org.apache.doris.clone.TabletChecker.CheckerCounter;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletScheduler.AddResult;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Reference;
+import org.apache.doris.persist.ModifyTenantLevelColocateMapInfo;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * TenantLevelColocateTableCheckerAndBalancer is responsible for tablets'
repair and balance of colocated tables.
+ */
+public class TenantLevelColocateTableCheckerAndBalancer extends
ColocateTableCheckerAndBalancer {
+ private static final Logger LOG =
LogManager.getLogger(TenantLevelColocateTableCheckerAndBalancer.class);
+
+ private TenantLevelColocateTableCheckerAndBalancer(long intervalMs) {
+ super("colocate group clone checker v2", intervalMs);
+ }
+
+ private static volatile TenantLevelColocateTableCheckerAndBalancer
INSTANCE = null;
+
+ public static TenantLevelColocateTableCheckerAndBalancer getInstance() {
+ if (INSTANCE == null) {
+ synchronized (TenantLevelColocateTableCheckerAndBalancer.class) {
+ if (INSTANCE == null) {
+ INSTANCE = new
TenantLevelColocateTableCheckerAndBalancer(Config.tablet_checker_interval_ms);
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ @Override
+ public void runAfterCatalogReady() {
+ relocateAndBalanceGroups();
+ matchGroups();
+ }
+
+ private void relocateAndBalanceGroups() {
+ Set<GroupV2Id> groupIds =
Env.getCurrentEnv().getTenantLevelColocateTableIndex().getAllGroupIds();
+
+ // balance only inside each group, excluded balance between all groups
+ Set<GroupV2Id> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+ if (!Config.disable_colocate_balance_between_groups
+ && !changeGroups.isEmpty()) {
+ // balance both inside each group and between all groups
+ relocateAndBalanceGroup(changeGroups, true);
+ }
+ }
+
+ private Set<GroupV2Id> relocateAndBalanceGroup(Set<GroupV2Id> groupIds,
boolean balanceBetweenGroups) {
+ Set<GroupV2Id> changeGroups = Sets.newHashSet();
+ if (Config.disable_colocate_balance) {
+ return changeGroups;
+ }
+
+ GlobalColocateStatistic globalColocateStatistic =
buildGlobalColocateStatistic();
+
+ // get all groups
+ for (GroupV2Id groupId : groupIds) {
+ try {
+ relocateAndBalanceGroup(groupId, balanceBetweenGroups,
changeGroups, globalColocateStatistic);
+ } catch (Exception e) {
+ LOG.error("relocate group {} failed.", groupId, e);
+ }
+ }
+
+ return changeGroups;
+ }
+
+ private void relocateAndBalanceGroup(GroupV2Id groupId, boolean
balanceBetweenGroups, Set<GroupV2Id> changeGroups,
+ GlobalColocateStatistic globalColocateStatistic) {
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+
+ Map<Tag, LoadStatisticForTag> statisticMap =
env.getTabletScheduler().getStatisticMap();
+ if (statisticMap == null) {
+ return;
+ }
+
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ return;
+ }
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+ try {
+ Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
+ } catch (DdlException e) {
+ colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
+ return;
+ }
+
+ Tag tag = groupSchema.getTag();
+ LoadStatisticForTag statistic = statisticMap.get(tag);
+ if (statistic == null) {
+ return;
+ }
+ List<List<Long>> backendsPerBucketSeq =
colocateIndex.getBackendsPerBucketSeqByGroup(groupId);
+ if (backendsPerBucketSeq.isEmpty()) {
+ return;
+ }
+
+ // get all unavailable backends in the backend bucket sequence of this
group
+ Set<Long> unavailableBeIdsInGroup = getUnavailableBeIdsInGroup(
+ infoService, colocateIndex, groupId, tag);
+ // get all available backends for this group
+ List<Long> availableBeIds = getAvailableBeIds(tag,
Collections.emptySet(),
+ infoService);
+ // try relocate or balance this group for specified tag
+ List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
+ if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup,
availableBeIds, colocateIndex,
+ infoService, statistic, globalColocateStatistic,
balancedBackendsPerBucketSeq,
+ balanceBetweenGroups)) {
+ if (!colocateIndex.addBackendsPerBucketSeq(groupId,
balancedBackendsPerBucketSeq, replicaAlloc)) {
+ LOG.warn("relocate group {} succ, but replica allocation has
change, old replica alloc {}",
+ groupId, replicaAlloc);
+ return;
+ }
+ colocateIndex.markMasterGroupUnstable(groupId, "relocated", true);
+ colocateIndex.markSlaveGroupUnstable(groupId, "master is
unstable", true);
+ changeGroups.add(groupId);
+ Map<GroupV2Id, List<List<Long>>> balancedBackendsPerBucketSeqMap =
Maps.newHashMap();
+ balancedBackendsPerBucketSeqMap.put(groupId,
balancedBackendsPerBucketSeq);
+ ModifyTenantLevelColocateMapInfo info = new
ModifyTenantLevelColocateMapInfo(
+ balancedBackendsPerBucketSeqMap);
+ env.getEditLog().logTenantLevelColocateBackendsPerBucketSeq(info);
+ LOG.info("balance group {}. now backends per bucket sequence for
tag {} is: {}",
+ groupId, tag, balancedBackendsPerBucketSeq);
+ }
+ }
+
+ /*
+ * Check every tablet of a group, if replica's location does not match
backends in group, relocating those
+ * replicas, and mark that group as unstable.
+ * If every replicas match the backends in group, mark that group as
stable.
+ */
+ private void matchGroups() {
+ long start = System.currentTimeMillis();
+ CheckerCounter counter = new CheckerCounter();
+
+ Env env = Env.getCurrentEnv();
+ TenantLevelColocateTableIndex colocateIndex =
env.getTenantLevelColocateTableIndex();
+
+ // check each group
+ Set<GroupV2Id> groupIds = colocateIndex.getAllGroupIds();
+ for (GroupV2Id groupId : groupIds) {
+ TenantLevelColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ LOG.info("Not found colocate group {}, maybe delete", groupId);
+ continue;
+ }
+
+ List<Set<Long>> backendBucketsSeq =
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+ if (backendBucketsSeq.isEmpty()) {
+ continue;
+ }
+
+ String unstableReason = matchMasterGroup(env, counter, groupId,
backendBucketsSeq);
Review Comment:
This matching pass still has no per-group isolation: `matchMasterGroup()` /
`matchSlaveGroup()` can throw from the `Preconditions.checkState(...)` checks
in `matchPartition()` (for example when one group's bucket map diverges from a
partition/index shape), and the exception escapes `matchGroups()` entirely.
Then every group after this one is skipped for health checking and repair on
every daemon round while the bad group persists, which breaks the PR's
tenant-level isolation goal. This is distinct from the existing relocate-loop
thread because it is the health matching phase, not balancing. Please wrap the
per-`groupId` body in `try/catch` and mark/log only that group, similar to the
updated relocate loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]