This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 607887673eb [improvement](report) report handler discard old report 
tasks #39469 (#39605)
607887673eb is described below

commit 607887673eb638f765c474971859b2b124226028
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Tue Aug 20 17:40:49 2024 +0800

    [improvement](report) report handler discard old report tasks #39469 
(#39605)
    
    cherry pick from #39469
---
 .../org/apache/doris/master/ReportHandler.java     | 90 ++++++++++++++++++----
 1 file changed, 76 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 29887658f7a..af62171007b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -104,6 +104,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.stream.Collectors;
@@ -111,10 +112,11 @@ import java.util.stream.Collectors;
 public class ReportHandler extends Daemon {
     private static final Logger LOG = 
LogManager.getLogger(ReportHandler.class);
 
-    private BlockingQueue<ReportTask> reportQueue = 
Queues.newLinkedBlockingQueue();
+    private BlockingQueue<BackendReportType> reportQueue = 
Queues.newLinkedBlockingQueue();
+
+    private Map<BackendReportType, ReportTask> reportTasks = Maps.newHashMap();
 
     private enum ReportType {
-        UNKNOWN,
         TASK,
         DISK,
         TABLET
@@ -158,7 +160,7 @@ public class ReportHandler extends Daemon {
         Map<Long, Long> partitionsVersion = null;
         long reportVersion = -1;
 
-        ReportType reportType = ReportType.UNKNOWN;
+        ReportType reportType = null;
 
         if (request.isSetTasks()) {
             tasks = request.getTasks();
@@ -189,8 +191,16 @@ public class ReportHandler extends Daemon {
             
backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore());
         }
 
-        ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, 
partitionsVersion, reportVersion,
-                request.getStoragePolicy(), request.getResource(), 
request.getNumCores(),
+        if (reportType == null) {
+            tStatus.setStatusCode(TStatusCode.INTERNAL_ERROR);
+            tStatus.setErrorMsgs(Lists.newArrayList("unknown report type"));
+            LOG.error("receive unknown report type from be {}. current queue 
size: {}",
+                    backend.getId(), reportQueue.size());
+            return result;
+        }
+
+        ReportTask reportTask = new ReportTask(beId, reportType, tasks, disks, 
tablets, partitionsVersion,
+                reportVersion, request.getStoragePolicy(), 
request.getResource(), request.getNumCores(),
                 request.getPipelineExecutorSize());
         try {
             putToQueue(reportTask);
@@ -202,8 +212,8 @@ public class ReportHandler extends Daemon {
             tStatus.setErrorMsgs(errorMsgs);
             return result;
         }
-        LOG.info("receive report from be {}. type: {}, current queue size: {}",
-                backend.getId(), reportType, reportQueue.size());
+        LOG.info("receive report from be {}. type: {}, report version {}, 
current queue size: {}",
+                backend.getId(), reportType, reportVersion, 
reportQueue.size());
         return result;
     }
 
@@ -215,7 +225,14 @@ public class ReportHandler extends Daemon {
                     "the report queue size exceeds the limit: "
                             + Config.report_queue_size + ". current: " + 
currentSize);
         }
-        reportQueue.put(reportTask);
+
+        BackendReportType backendReportType = new 
BackendReportType(reportTask.beId, reportTask.reportType);
+
+        synchronized (reportTasks) {
+            reportTasks.put(backendReportType, reportTask);
+        }
+
+        reportQueue.put(backendReportType);
     }
 
     private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
@@ -230,9 +247,38 @@ public class ReportHandler extends Daemon {
         return tabletMap;
     }
 
+    private class BackendReportType {
+        private long beId;
+        private ReportType reportType;
+
+        public BackendReportType(long beId, ReportType reportType) {
+            this.beId = beId;
+            this.reportType = reportType;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(beId, reportType);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (!(other instanceof BackendReportType)) {
+                return false;
+            }
+            BackendReportType otherBeReport = (BackendReportType) other;
+            return this.beId == otherBeReport.beId
+                && this.reportType == otherBeReport.reportType;
+        }
+    }
+
     private class ReportTask extends MasterTask {
 
         private long beId;
+        private ReportType reportType;
         private Map<TTaskType, Set<Long>> tasks;
         private Map<String, TDisk> disks;
         private Map<Long, TTablet> tablets;
@@ -244,12 +290,13 @@ public class ReportHandler extends Daemon {
         private int cpuCores;
         private int pipelineExecutorSize;
 
-        public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
+        public ReportTask(long beId, ReportType reportType, Map<TTaskType, 
Set<Long>> tasks,
                 Map<String, TDisk> disks, Map<Long, TTablet> tablets,
                 Map<Long, Long> partitionsVersion, long reportVersion,
                 List<TStoragePolicy> storagePolicies, List<TStorageResource> 
storageResources, int cpuCores,
                 int pipelineExecutorSize) {
             this.beId = beId;
+            this.reportType = reportType;
             this.tasks = tasks;
             this.disks = disks;
             this.tablets = tablets;
@@ -1383,13 +1430,28 @@ public class ReportHandler extends Daemon {
     @Override
     protected void runOneCycle() {
         while (true) {
-            ReportTask task = null;
-            try {
-                task = reportQueue.take();
+            ReportTask task = takeReportTask();
+            if (task != null) {
                 task.exec();
-            } catch (InterruptedException e) {
-                LOG.warn("got interupted exception when executing report", e);
             }
         }
     }
+
+    private ReportTask takeReportTask() {
+        BackendReportType backendReportType;
+        try {
+            backendReportType = reportQueue.take();
+        } catch (InterruptedException e) {
+            LOG.warn("got interupted exception when executing report", e);
+            return null;
+        }
+
+        ReportTask task = null;
+        synchronized (reportTasks) {
+            task = reportTasks.get(backendReportType);
+            reportTasks.remove(backendReportType);
+        }
+
+        return task;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to