kangkaisen commented on a change in pull request #5010: URL: https://github.com/apache/incubator-doris/pull/5010#discussion_r547071293
########## File path: docs/zh-CN/administrator-guide/operation/tablet-repair-and-balance.md ########## @@ -215,11 +215,15 @@ TabletScheduler 里等待被调度的分片会根据状态不同,赋予不同 ## 副本均衡 -Doris 会自动进行集群内的副本均衡。均衡的主要思想,是对某些分片,先在低负载的节点上创建一个副本,然后再删除这些分片在高负载节点上的副本。同时,因为不同存储介质的存在,在同一个集群内的不同 BE 节点上,可能存在一种或两种存储介质。我们要求存储介质为 A 的分片在均衡后,尽量依然存储在存储介质 A 中。所以我们根据存储介质,对集群的 BE 节点进行划分。然后针对不同的存储介质的 BE 节点集合,进行负载均衡调度。 +Doris 会自动进行集群内的副本均衡。目前支持两种均衡策略,负载/分区。 Review comment: 这里应该告诉用户,什么情况下应该使用哪种均衡策略,每种均衡策略的优缺点是什么? ########## File path: fe/fe-core/src/main/java/org/apache/doris/clone/MovesInProgressCache.java ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Maps; +import org.apache.doris.common.Pair; +import org.apache.doris.thrift.TStorageMedium; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class MovesInProgressCache { Review comment: Add a comment for cache what? ########## File path: fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyAlgo.java ########## @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; +import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo; +import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo; +import org.apache.doris.common.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +// A two-dimensional greedy rebalancing algorithm. From among moves that +// decrease the skew of a most skewed partition, it prefers ones that reduce the +// skew of the cluster. A cluster is considered balanced when the skew of every +// partition is <= 1 and the skew of the cluster is <= 1. +// +// The skew of the cluster is defined as the difference between the maximum +// total replica count over all bes and the minimum total replica +// count over all bes. +public class TwoDimensionalGreedyAlgo { Review comment: 我觉得这个类名应该体现出 这是一个 rebalancing 相关的算法。 然后 注释 最好解释下 两维 是哪两维? ########## File path: docs/zh-CN/administrator-guide/operation/tablet-repair-and-balance.md ########## @@ -215,11 +215,15 @@ TabletScheduler 里等待被调度的分片会根据状态不同,赋予不同 ## 副本均衡 -Doris 会自动进行集群内的副本均衡。均衡的主要思想,是对某些分片,先在低负载的节点上创建一个副本,然后再删除这些分片在高负载节点上的副本。同时,因为不同存储介质的存在,在同一个集群内的不同 BE 节点上,可能存在一种或两种存储介质。我们要求存储介质为 A 的分片在均衡后,尽量依然存储在存储介质 A 中。所以我们根据存储介质,对集群的 BE 节点进行划分。然后针对不同的存储介质的 BE 节点集合,进行负载均衡调度。 +Doris 会自动进行集群内的副本均衡。目前支持两种均衡策略,负载/分区。 Review comment: 文档里最好说明 两种均衡策略是否可以随意替换。 ########## File path: fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java ########## @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.collect.TreeMultimap; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class PartitionRebalancer extends Rebalancer { Review comment: Could copy the description in the doc to this class comment. ########## File path: fe/fe-core/src/main/java/org/apache/doris/clone/TwoDimensionalGreedyAlgo.java ########## @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; +import org.apache.doris.catalog.TabletInvertedIndex.PartitionBalanceInfo; +import org.apache.doris.clone.PartitionRebalancer.ClusterBalanceInfo; +import org.apache.doris.common.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + +// A two-dimensional greedy rebalancing algorithm. From among moves that +// decrease the skew of a most skewed partition, it prefers ones that reduce the +// skew of the cluster. A cluster is considered balanced when the skew of every +// partition is <= 1 and the skew of the cluster is <= 1. +// +// The skew of the cluster is defined as the difference between the maximum +// total replica count over all bes and the minimum total replica +// count over all bes. +public class TwoDimensionalGreedyAlgo { Review comment: 这里可以注明是 这个类是 inspire 或者modify from kudu. ########## File path: fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java ########## @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.collect.TreeMultimap; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TStorageMedium; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +public class PartitionRebalancer extends Rebalancer { + private static final Logger LOG = LogManager.getLogger(PartitionRebalancer.class); + + private final TwoDimensionalGreedyAlgo algo = new TwoDimensionalGreedyAlgo(); + protected final MovesInProgressCache movesInProgressCache = new MovesInProgressCache(); + + private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0); + private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0); + + public PartitionRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) { + super(infoService, invertedIndex); + } + + @Override + protected List<TabletSchedCtx> selectAlternativeTabletsForCluster( + String clusterName, ClusterLoadStatistic clusterStat, TStorageMedium medium) { + MovesInProgressCache.Cell movesInProgress = movesInProgressCache.getCache(clusterName, medium); + Preconditions.checkNotNull(movesInProgress, "clusterStat is got from statisticMap, movesInProgressMap should have the same entry"); + + // iterating through cache.asMap().values() does not reset access time for the entries you retrieve. + List<ReplicaMove> movesInProgressList = movesInProgress.get().asMap().values() + .stream().map(p -> p.first).collect(Collectors.toList()); + List<Long> toDeleteKeys = Lists.newArrayList(); + + // The problematic movements will be found in buildClusterInfo(), so here is a simply move completion check + // of moves which have valid ToDeleteReplica. + List<ReplicaMove> movesNeedCheck = movesInProgress.get().asMap().values() + .stream().filter(p -> p.second != -1L).map(p -> p.first).collect(Collectors.toList()); + checkMovesCompleted(movesNeedCheck, toDeleteKeys); + + ClusterBalanceInfo clusterBalanceInfo = new ClusterBalanceInfo(); + // We should assume the in-progress moves have been succeeded to avoid producing the same moves. + // Apply in-progress moves to current cluster stats, use TwoDimensionalGreedyAlgo.ApplyMove for simplicity. + if (!buildClusterInfo(clusterStat, medium, movesInProgressList, clusterBalanceInfo, toDeleteKeys)) { + return Lists.newArrayList(); + } + + // Just delete the completed or problematic moves + if (!toDeleteKeys.isEmpty()) { + movesInProgress.get().invalidateAll(toDeleteKeys); + movesInProgressList = movesInProgressList.stream() + .filter(m -> !toDeleteKeys.contains(m.tabletId)).collect(Collectors.toList()); + } + + if (movesInProgressCache.size() > Config.max_balancing_tablets) { Review comment: 我们可以把move 这个概念完全去掉吗? 改成 balancing 或者 balancing_tablets。 我们系统里面的概念应该尽可能少,保持统一。 ########## File path: fe/fe-core/src/main/java/org/apache/doris/clone/MovesInProgressCache.java ########## @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Maps; +import org.apache.doris.common.Pair; +import org.apache.doris.thrift.TStorageMedium; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class MovesInProgressCache { + // cluster -> medium -> moves in progress + private final Map<String, Map<TStorageMedium, Cell>> movesInProgressMap = Maps.newHashMap(); + + // TabletId -> Pair<Move, ToDeleteReplicaId>, 'ToDeleteReplicaId == -1' means this move haven't been scheduled successfully. + public static class Cell { + Cache<Long, Pair<PartitionRebalancer.ReplicaMove, Long>> cache; + + Cell(long duration, TimeUnit unit) { + cache = CacheBuilder.newBuilder().expireAfterAccess(duration, unit).build(); + } + + public Cache<Long, Pair<PartitionRebalancer.ReplicaMove, Long>> get() { + return cache; + } + } + + public void updateCatalog(Map<String, ClusterLoadStatistic> statisticMap, long expireAfterAccessSecond) { + updateCatalog(statisticMap, expireAfterAccessSecond, TimeUnit.SECONDS); + } + + public void updateCatalog(Map<String, ClusterLoadStatistic> statisticMap, long duration, TimeUnit unit) { Review comment: maybe need a better name ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org