This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5280c18100a [improvement](query) prefer to chose tablet on alive disk (#39467) 5280c18100a is described below commit 5280c18100afac7e531a9443c03319a9df389ead Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Tue Aug 20 23:01:53 2024 +0800 [improvement](query) prefer to chose tablet on alive disk (#39467) improvement: 1. when query, prefer to chose tablets on alive disks; 2. when be report tablets, if report version fall behind, try report again; 3. when be restart, it report its tablets and disks immedidately, no wait 1min; 4. when fe handle tablet report, even if this report is stale, but if there exists other health tablets and this tablet is on bad disk, still process this tablet; --- be/src/agent/task_worker_pool.cpp | 19 ++--- be/src/service/doris_main.cpp | 2 + .../java/org/apache/doris/catalog/DiskInfo.java | 4 ++ .../main/java/org/apache/doris/catalog/Tablet.java | 28 +++++--- .../org/apache/doris/master/ReportHandler.java | 29 +++++++- .../org/apache/doris/planner/OlapScanNode.java | 14 +++- .../org/apache/doris/system/SystemInfoService.java | 2 +- .../org/apache/doris/catalog/QueryTabletTest.java | 84 ++++++++++++++++++++++ .../apache/doris/utframe/MockedBackendFactory.java | 31 ++++++++ 9 files changed, 191 insertions(+), 22 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index a02f1761463..27921888774 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1009,13 +1009,6 @@ void report_task_callback(const TMasterInfo& master_info) { } void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info) { - // Random sleep 1~5 seconds before doing report. - // In order to avoid the problem that the FE receives many report requests at the same time - // and can not be processed. - if (config::report_random_wait) { - random_sleep(5); - } - TReportRequest request; request.__set_backend(BackendOptions::get_local_backend()); request.__isset.disks = true; @@ -1081,8 +1074,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf request.__set_backend(BackendOptions::get_local_backend()); request.__isset.tablets = true; - uint64_t report_version = s_report_version; - engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); + uint64_t report_version; + for (int i = 0; i < 5; i++) { + request.tablets.clear(); + report_version = s_report_version; + engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); + if (report_version == s_report_version) { + break; + } + } + if (report_version < s_report_version) { // TODO llj This can only reduce the possibility for report error, but can't avoid it. // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 92d3452dcb1..dcc76259868 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -602,6 +602,8 @@ int main(int argc, char** argv) { stop_work_if_error( status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string()); + exec_env->storage_engine().notify_listeners(); + while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index 934e7f75fb0..38d8037befc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -151,6 +151,10 @@ public class DiskInfo implements Writable { return pathHash != 0; } + public boolean isAlive() { + return state == DiskState.ONLINE; + } + public boolean isStorageMediumMatch(TStorageMedium storageMedium) { return this.storageMedium == storageMedium; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 68aa70a4039..4102f4f117e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -304,9 +304,11 @@ public class Tablet extends MetaObject { } // for query - public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) { + public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Long>> backendAlivePathHashs, + boolean allowFailedVersion) { List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size()); List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size()); + List<Replica> deadPathReplica = Lists.newArrayList(); for (Replica replica : replicas) { if (replica.isBad()) { continue; @@ -317,21 +319,31 @@ public class Tablet extends MetaObject { continue; } + if (!replica.checkVersionCatchUp(visibleVersion, false)) { + continue; + } + + Set<Long> thisBeAlivePaths = backendAlivePathHashs.get(replica.getBackendId()); ReplicaState state = replica.getState(); - if (state.canQuery()) { - if (replica.checkVersionCatchUp(visibleVersion, false)) { - allQueryableReplica.add(replica); - } + // if thisBeAlivePaths contains pathHash = 0, it mean this be hadn't report disks state. + // should ignore this case. + if (replica.getPathHash() != -1 && thisBeAlivePaths != null + && !thisBeAlivePaths.contains(replica.getPathHash()) + && !thisBeAlivePaths.contains(0L)) { + deadPathReplica.add(replica); + } else if (state.canQuery()) { + allQueryableReplica.add(replica); } else if (state == ReplicaState.DECOMMISSION) { - if (replica.checkVersionCatchUp(visibleVersion, false)) { - auxiliaryReplica.add(replica); - } + auxiliaryReplica.add(replica); } } if (allQueryableReplica.isEmpty()) { allQueryableReplica = auxiliaryReplica; } + if (allQueryableReplica.isEmpty()) { + allQueryableReplica = deadPathReplica; + } if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) { long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount) diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 89bc9a6e522..06b560ab362 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.BinlogConfig; import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.MaterializedIndex; @@ -822,6 +823,15 @@ public class ReportHandler extends Daemon { AgentBatchTask createReplicaBatchTask = new AgentBatchTask(); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); Map<Object, Object> objectPool = new HashMap<Object, Object>(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + Set<Long> backendHealthPathHashs; + if (backend == null) { + backendHealthPathHashs = Sets.newHashSet(); + } else { + backendHealthPathHashs = backend.getDisks().values().stream() + .filter(DiskInfo::isAlive) + .map(DiskInfo::getPathHash).collect(Collectors.toSet()); + } for (Long dbId : tabletDeleteFromMeta.keySet()) { Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -877,7 +887,24 @@ public class ReportHandler extends Daemon { long currentBackendReportVersion = Env.getCurrentSystemInfo() .getBackendReportVersion(backendId); if (backendReportVersion < currentBackendReportVersion) { - continue; + + // if backendHealthPathHashs contains health path hash 0, + // it means this backend hadn't reported disks state, + // should ignore this case. + boolean thisReplicaOnBadDisk = replica.getPathHash() != -1L + && !backendHealthPathHashs.contains(replica.getPathHash()) + && !backendHealthPathHashs.contains(0L); + + boolean existsOtherHealthReplica = tablet.getReplicas().stream() + .anyMatch(r -> r.getBackendId() != replica.getBackendId() + && r.getVersion() >= replica.getVersion() + && r.getLastFailedVersion() == -1L + && !r.isBad()); + + // if replica is on bad disks and there are other health replicas, still delete it. + if (!(thisReplicaOnBadDisk && existsOtherHealthReplica)) { + continue; + } } BinlogConfig binlogConfig = new BinlogConfig(olapTable.getBinlogConfig()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index dffbba37cfe..8e5ab5cdf0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -40,6 +40,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; @@ -751,7 +752,7 @@ public class OlapScanNode extends ScanNode { } private void addScanRangeLocations(Partition partition, - List<Tablet> tablets) throws UserException { + List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) throws UserException { long visibleVersion = Partition.PARTITION_INIT_VERSION; // For cloud mode, set scan range visible version in Coordinator.exec so that we could @@ -804,7 +805,8 @@ public class OlapScanNode extends ScanNode { // // ATTN: visibleVersion is not used in cloud mode, see CloudReplica.checkVersionCatchup // for details. - List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion); + List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, + backendAlivePathHashs, skipMissingVersion); if (replicas.isEmpty()) { if (ConnectContext.get().getSessionVariable().skipBadTablet) { continue; @@ -1168,6 +1170,12 @@ public class OlapScanNode extends ScanNode { */ Preconditions.checkState(scanBackendIds.size() == 0); Preconditions.checkState(scanTabletIds.size() == 0); + Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() + .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); + } + for (Long partitionId : selectedPartitionIds) { final Partition partition = olapTable.getPartition(partitionId); final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); @@ -1209,7 +1217,7 @@ public class OlapScanNode extends ScanNode { totalTabletsNum += selectedTable.getTablets().size(); selectedSplitNum += tablets.size(); - addScanRangeLocations(partition, tablets); + addScanRangeLocations(partition, tablets, backendAlivePathHashs); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 836d516c942..f81d8b4d7b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -799,7 +799,7 @@ public class SystemInfoService { } } - private ImmutableMap<Long, Backend> getAllClusterBackendsNoException() { + public ImmutableMap<Long, Backend> getAllClusterBackendsNoException() { try { return getAllBackendsByAllCluster(); } catch (AnalysisException e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java new file mode 100644 index 00000000000..32929523a53 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class QueryTabletTest extends TestWithFeService { + + @Override + protected int backendNum() { + return 3; + } + + @Test + public void testTabletOnBadDisks() throws Exception { + createDatabase("db1"); + createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 1" + + " properties('replication_num' = '3')"); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("db1"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1"); + Assertions.assertNotNull(tbl); + Tablet tablet = tbl.getPartitions().iterator().next() + .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + List<Replica> replicas = tablet.getReplicas(); + Assertions.assertEquals(3, replicas.size()); + for (Replica replica : replicas) { + Assertions.assertTrue(replica.getPathHash() != -1L); + } + + Assertions.assertEquals(replicas, + tablet.getQueryableReplicas(1L, getAlivePathHashs(), false)); + + // disk mark as bad + Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId()) + .getDisks().values().forEach(disk -> disk.setState(DiskInfo.DiskState.OFFLINE)); + + // lost disk + replicas.get(1).setPathHash(-123321L); + + Assertions.assertEquals(Lists.newArrayList(replicas.get(2)), + tablet.getQueryableReplicas(1L, getAlivePathHashs(), false)); + } + + private Map<Long, Set<Long>> getAlivePathHashs() { + Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap(); + for (Backend backend : Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) { + backendAlivePathHashs.put(backend.getId(), backend.getDisks().values().stream() + .filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet())); + } + + return backendAlivePathHashs; + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 3934e140f67..9e8ff913ada 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -41,6 +41,7 @@ import org.apache.doris.thrift.TCheckStorageFormatResult; import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest; import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse; import org.apache.doris.thrift.TCloneReq; +import org.apache.doris.thrift.TCreateTabletReq; import org.apache.doris.thrift.TDiskTrashInfo; import org.apache.doris.thrift.TDropTabletReq; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -95,7 +96,9 @@ import org.apache.thrift.TException; import java.io.IOException; import java.util.List; +import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; /* * This class is used to create mock backends. @@ -203,6 +206,9 @@ public class MockedBackendFactory { TTaskType taskType = request.getTaskType(); switch (taskType) { case CREATE: + ++reportVersion; + handleCreateTablet(request, finishTaskRequest); + break; case ALTER: ++reportVersion; break; @@ -210,6 +216,7 @@ public class MockedBackendFactory { handleDropTablet(request, finishTaskRequest); break; case CLONE: + ++reportVersion; handleCloneTablet(request, finishTaskRequest); break; case STORAGE_MEDIUM_MIGRATE: @@ -235,6 +242,30 @@ public class MockedBackendFactory { } } + private void handleCreateTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { + TCreateTabletReq req = request.getCreateTabletReq(); + List<DiskInfo> candDisks = backendInFe.getDisks().values().stream() + .filter(disk -> req.storage_medium == disk.getStorageMedium() && disk.isAlive()) + .collect(Collectors.toList()); + if (candDisks.isEmpty()) { + candDisks = backendInFe.getDisks().values().stream() + .filter(DiskInfo::isAlive) + .collect(Collectors.toList()); + } + DiskInfo choseDisk = candDisks.isEmpty() ? null + : candDisks.get(new Random().nextInt(candDisks.size())); + + List<TTabletInfo> tabletInfos = Lists.newArrayList(); + TTabletInfo tabletInfo = new TTabletInfo(); + tabletInfo.setTabletId(req.tablet_id); + tabletInfo.setVersion(req.version); + tabletInfo.setPathHash(choseDisk == null ? -1L : choseDisk.getPathHash()); + tabletInfo.setReplicaId(req.replica_id); + tabletInfo.setUsed(true); + tabletInfos.add(tabletInfo); + finishTaskRequest.setFinishTabletInfos(tabletInfos); + } + private void handleDropTablet(TAgentTaskRequest request, TFinishTaskRequest finishTaskRequest) { TDropTabletReq req = request.getDropTabletReq(); long dataSize = Math.max(1, CatalogTestUtil.getTabletDataSize(req.tablet_id)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org