This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5280c18100a [improvement](query) prefer to chose tablet on alive disk 
(#39467)
5280c18100a is described below

commit 5280c18100afac7e531a9443c03319a9df389ead
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Tue Aug 20 23:01:53 2024 +0800

    [improvement](query) prefer to chose tablet on alive disk (#39467)
    
    improvement:
    1.  when query, prefer to chose tablets on alive disks;
    2. when be report tablets, if report version fall behind, try report
    again;
    3. when be restart, it report its tablets and disks immedidately, no
    wait 1min;
    4. when fe handle tablet report, even if this report is stale, but if
    there exists other health tablets and this tablet is on bad disk, still
    process this tablet;
---
 be/src/agent/task_worker_pool.cpp                  | 19 ++---
 be/src/service/doris_main.cpp                      |  2 +
 .../java/org/apache/doris/catalog/DiskInfo.java    |  4 ++
 .../main/java/org/apache/doris/catalog/Tablet.java | 28 +++++---
 .../org/apache/doris/master/ReportHandler.java     | 29 +++++++-
 .../org/apache/doris/planner/OlapScanNode.java     | 14 +++-
 .../org/apache/doris/system/SystemInfoService.java |  2 +-
 .../org/apache/doris/catalog/QueryTabletTest.java  | 84 ++++++++++++++++++++++
 .../apache/doris/utframe/MockedBackendFactory.java | 31 ++++++++
 9 files changed, 191 insertions(+), 22 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index a02f1761463..27921888774 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1009,13 +1009,6 @@ void report_task_callback(const TMasterInfo& 
master_info) {
 }
 
 void report_disk_callback(StorageEngine& engine, const TMasterInfo& 
master_info) {
-    // Random sleep 1~5 seconds before doing report.
-    // In order to avoid the problem that the FE receives many report requests 
at the same time
-    // and can not be processed.
-    if (config::report_random_wait) {
-        random_sleep(5);
-    }
-
     TReportRequest request;
     request.__set_backend(BackendOptions::get_local_backend());
     request.__isset.disks = true;
@@ -1081,8 +1074,16 @@ void report_tablet_callback(StorageEngine& engine, const 
TMasterInfo& master_inf
     request.__set_backend(BackendOptions::get_local_backend());
     request.__isset.tablets = true;
 
-    uint64_t report_version = s_report_version;
-    engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
+    uint64_t report_version;
+    for (int i = 0; i < 5; i++) {
+        request.tablets.clear();
+        report_version = s_report_version;
+        
engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
+        if (report_version == s_report_version) {
+            break;
+        }
+    }
+
     if (report_version < s_report_version) {
         // TODO llj This can only reduce the possibility for report error, but 
can't avoid it.
         // If FE create a tablet in FE meta and send CREATE task to this BE, 
the tablet may not be included in this
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 92d3452dcb1..dcc76259868 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -602,6 +602,8 @@ int main(int argc, char** argv) {
     stop_work_if_error(
             status, "Arrow Flight Service did not start correctly, exiting, " 
+ status.to_string());
 
+    exec_env->storage_engine().notify_listeners();
+
     while (!doris::k_doris_exit) {
 #if defined(LEAK_SANITIZER)
         __lsan_do_leak_check();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
index 934e7f75fb0..38d8037befc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
@@ -151,6 +151,10 @@ public class DiskInfo implements Writable {
         return pathHash != 0;
     }
 
+    public boolean isAlive() {
+        return state == DiskState.ONLINE;
+    }
+
     public boolean isStorageMediumMatch(TStorageMedium storageMedium) {
         return this.storageMedium == storageMedium;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 68aa70a4039..4102f4f117e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -304,9 +304,11 @@ public class Tablet extends MetaObject {
     }
 
     // for query
-    public List<Replica> getQueryableReplicas(long visibleVersion, boolean 
allowFailedVersion) {
+    public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, 
Set<Long>> backendAlivePathHashs,
+            boolean allowFailedVersion) {
         List<Replica> allQueryableReplica = 
Lists.newArrayListWithCapacity(replicas.size());
         List<Replica> auxiliaryReplica = 
Lists.newArrayListWithCapacity(replicas.size());
+        List<Replica> deadPathReplica = Lists.newArrayList();
         for (Replica replica : replicas) {
             if (replica.isBad()) {
                 continue;
@@ -317,21 +319,31 @@ public class Tablet extends MetaObject {
                 continue;
             }
 
+            if (!replica.checkVersionCatchUp(visibleVersion, false)) {
+                continue;
+            }
+
+            Set<Long> thisBeAlivePaths = 
backendAlivePathHashs.get(replica.getBackendId());
             ReplicaState state = replica.getState();
-            if (state.canQuery()) {
-                if (replica.checkVersionCatchUp(visibleVersion, false)) {
-                    allQueryableReplica.add(replica);
-                }
+            // if thisBeAlivePaths contains pathHash = 0, it mean this be 
hadn't report disks state.
+            // should ignore this case.
+            if (replica.getPathHash() != -1 && thisBeAlivePaths != null
+                    && !thisBeAlivePaths.contains(replica.getPathHash())
+                    && !thisBeAlivePaths.contains(0L)) {
+                deadPathReplica.add(replica);
+            } else if (state.canQuery()) {
+                allQueryableReplica.add(replica);
             } else if (state == ReplicaState.DECOMMISSION) {
-                if (replica.checkVersionCatchUp(visibleVersion, false)) {
-                    auxiliaryReplica.add(replica);
-                }
+                auxiliaryReplica.add(replica);
             }
         }
 
         if (allQueryableReplica.isEmpty()) {
             allQueryableReplica = auxiliaryReplica;
         }
+        if (allQueryableReplica.isEmpty()) {
+            allQueryableReplica = deadPathReplica;
+        }
 
         if (Config.skip_compaction_slower_replica && 
allQueryableReplica.size() > 1) {
             long minVersionCount = 
allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 89bc9a6e522..06b560ab362 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.MaterializedIndex;
@@ -822,6 +823,15 @@ public class ReportHandler extends Daemon {
         AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
         Map<Object, Object> objectPool = new HashMap<Object, Object>();
+        Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+        Set<Long> backendHealthPathHashs;
+        if (backend == null) {
+            backendHealthPathHashs = Sets.newHashSet();
+        } else {
+            backendHealthPathHashs = backend.getDisks().values().stream()
+                    .filter(DiskInfo::isAlive)
+                    .map(DiskInfo::getPathHash).collect(Collectors.toSet());
+        }
         for (Long dbId : tabletDeleteFromMeta.keySet()) {
             Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
             if (db == null) {
@@ -877,7 +887,24 @@ public class ReportHandler extends Daemon {
                     long currentBackendReportVersion = 
Env.getCurrentSystemInfo()
                             .getBackendReportVersion(backendId);
                     if (backendReportVersion < currentBackendReportVersion) {
-                        continue;
+
+                        // if backendHealthPathHashs contains health path hash 
0,
+                        // it means this backend hadn't reported disks state,
+                        // should ignore this case.
+                        boolean thisReplicaOnBadDisk = replica.getPathHash() 
!= -1L
+                                && 
!backendHealthPathHashs.contains(replica.getPathHash())
+                                && !backendHealthPathHashs.contains(0L);
+
+                        boolean existsOtherHealthReplica = 
tablet.getReplicas().stream()
+                                .anyMatch(r -> r.getBackendId() != 
replica.getBackendId()
+                                        && r.getVersion() >= 
replica.getVersion()
+                                        && r.getLastFailedVersion() == -1L
+                                        && !r.isBad());
+
+                        // if replica is on bad disks and there are other 
health replicas, still delete it.
+                        if (!(thisReplicaOnBadDisk && 
existsOtherHealthReplica)) {
+                            continue;
+                        }
                     }
 
                     BinlogConfig binlogConfig = new 
BinlogConfig(olapTable.getBinlogConfig());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index dffbba37cfe..8e5ab5cdf0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HashDistributionInfo;
@@ -751,7 +752,7 @@ public class OlapScanNode extends ScanNode {
     }
 
     private void addScanRangeLocations(Partition partition,
-            List<Tablet> tablets) throws UserException {
+            List<Tablet> tablets, Map<Long, Set<Long>> backendAlivePathHashs) 
throws UserException {
         long visibleVersion = Partition.PARTITION_INIT_VERSION;
 
         // For cloud mode, set scan range visible version in Coordinator.exec 
so that we could
@@ -804,7 +805,8 @@ public class OlapScanNode extends ScanNode {
             //
             // ATTN: visibleVersion is not used in cloud mode, see 
CloudReplica.checkVersionCatchup
             // for details.
-            List<Replica> replicas = 
tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
+            List<Replica> replicas = 
tablet.getQueryableReplicas(visibleVersion,
+                    backendAlivePathHashs, skipMissingVersion);
             if (replicas.isEmpty()) {
                 if (ConnectContext.get().getSessionVariable().skipBadTablet) {
                     continue;
@@ -1168,6 +1170,12 @@ public class OlapScanNode extends ScanNode {
          */
         Preconditions.checkState(scanBackendIds.size() == 0);
         Preconditions.checkState(scanTabletIds.size() == 0);
+        Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+        for (Backend backend : 
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
+            backendAlivePathHashs.put(backend.getId(), 
backend.getDisks().values().stream()
+                    
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+        }
+
         for (Long partitionId : selectedPartitionIds) {
             final Partition partition = olapTable.getPartition(partitionId);
             final MaterializedIndex selectedTable = 
partition.getIndex(selectedIndexId);
@@ -1209,7 +1217,7 @@ public class OlapScanNode extends ScanNode {
 
             totalTabletsNum += selectedTable.getTablets().size();
             selectedSplitNum += tablets.size();
-            addScanRangeLocations(partition, tablets);
+            addScanRangeLocations(partition, tablets, backendAlivePathHashs);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 836d516c942..f81d8b4d7b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -799,7 +799,7 @@ public class SystemInfoService {
         }
     }
 
-    private ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
+    public ImmutableMap<Long, Backend> getAllClusterBackendsNoException() {
         try {
             return getAllBackendsByAllCluster();
         } catch (AnalysisException e) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
new file mode 100644
index 00000000000..32929523a53
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/QueryTabletTest.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class QueryTabletTest extends TestWithFeService {
+
+    @Override
+    protected int backendNum() {
+        return 3;
+    }
+
+    @Test
+    public void testTabletOnBadDisks() throws Exception {
+        createDatabase("db1");
+        createTable("create table db1.tbl1(k1 int) distributed by hash(k1) 
buckets 1"
+                + " properties('replication_num' = '3')");
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("db1");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+        Assertions.assertNotNull(tbl);
+        Tablet tablet = tbl.getPartitions().iterator().next()
+                
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+                .getTablets().iterator().next();
+
+        List<Replica> replicas = tablet.getReplicas();
+        Assertions.assertEquals(3, replicas.size());
+        for (Replica replica : replicas) {
+            Assertions.assertTrue(replica.getPathHash() != -1L);
+        }
+
+        Assertions.assertEquals(replicas,
+                tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+
+        // disk mark as bad
+        Env.getCurrentSystemInfo().getBackend(replicas.get(0).getBackendId())
+                .getDisks().values().forEach(disk -> 
disk.setState(DiskInfo.DiskState.OFFLINE));
+
+        // lost disk
+        replicas.get(1).setPathHash(-123321L);
+
+        Assertions.assertEquals(Lists.newArrayList(replicas.get(2)),
+                tablet.getQueryableReplicas(1L, getAlivePathHashs(), false));
+    }
+
+    private Map<Long, Set<Long>> getAlivePathHashs() {
+        Map<Long, Set<Long>> backendAlivePathHashs = Maps.newHashMap();
+        for (Backend backend : 
Env.getCurrentSystemInfo().getAllClusterBackendsNoException().values()) {
+            backendAlivePathHashs.put(backend.getId(), 
backend.getDisks().values().stream()
+                    
.filter(DiskInfo::isAlive).map(DiskInfo::getPathHash).collect(Collectors.toSet()));
+        }
+
+        return backendAlivePathHashs;
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 3934e140f67..9e8ff913ada 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -41,6 +41,7 @@ import org.apache.doris.thrift.TCheckStorageFormatResult;
 import org.apache.doris.thrift.TCheckWarmUpCacheAsyncRequest;
 import org.apache.doris.thrift.TCheckWarmUpCacheAsyncResponse;
 import org.apache.doris.thrift.TCloneReq;
+import org.apache.doris.thrift.TCreateTabletReq;
 import org.apache.doris.thrift.TDiskTrashInfo;
 import org.apache.doris.thrift.TDropTabletReq;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -95,7 +96,9 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
 
 /*
  * This class is used to create mock backends.
@@ -203,6 +206,9 @@ public class MockedBackendFactory {
                             TTaskType taskType = request.getTaskType();
                             switch (taskType) {
                                 case CREATE:
+                                    ++reportVersion;
+                                    handleCreateTablet(request, 
finishTaskRequest);
+                                    break;
                                 case ALTER:
                                     ++reportVersion;
                                     break;
@@ -210,6 +216,7 @@ public class MockedBackendFactory {
                                     handleDropTablet(request, 
finishTaskRequest);
                                     break;
                                 case CLONE:
+                                    ++reportVersion;
                                     handleCloneTablet(request, 
finishTaskRequest);
                                     break;
                                 case STORAGE_MEDIUM_MIGRATE:
@@ -235,6 +242,30 @@ public class MockedBackendFactory {
                     }
                 }
 
+                private void handleCreateTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
+                    TCreateTabletReq req = request.getCreateTabletReq();
+                    List<DiskInfo> candDisks = 
backendInFe.getDisks().values().stream()
+                            .filter(disk -> req.storage_medium == 
disk.getStorageMedium() && disk.isAlive())
+                            .collect(Collectors.toList());
+                    if (candDisks.isEmpty()) {
+                        candDisks = backendInFe.getDisks().values().stream()
+                                .filter(DiskInfo::isAlive)
+                                .collect(Collectors.toList());
+                    }
+                    DiskInfo choseDisk = candDisks.isEmpty() ? null
+                            : candDisks.get(new 
Random().nextInt(candDisks.size()));
+
+                    List<TTabletInfo> tabletInfos = Lists.newArrayList();
+                    TTabletInfo tabletInfo = new TTabletInfo();
+                    tabletInfo.setTabletId(req.tablet_id);
+                    tabletInfo.setVersion(req.version);
+                    tabletInfo.setPathHash(choseDisk == null ? -1L : 
choseDisk.getPathHash());
+                    tabletInfo.setReplicaId(req.replica_id);
+                    tabletInfo.setUsed(true);
+                    tabletInfos.add(tabletInfo);
+                    finishTaskRequest.setFinishTabletInfos(tabletInfos);
+                }
+
                 private void handleDropTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
                     TDropTabletReq req = request.getDropTabletReq();
                     long dataSize = Math.max(1, 
CatalogTestUtil.getTabletDataSize(req.tablet_id));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to