This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 908dff551aba955073e9692f6ea6f0f00718cd99 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Thu Mar 7 21:25:51 2024 +0800 [fix](statistics)Add synchronize for modify analysisTaskInfoMap and analysisJobInfoMap. #31940 --- .../apache/doris/statistics/AnalysisManager.java | 48 ++++++++++++++-------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index b265b88f702..6bdf6fdb771 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -591,14 +591,16 @@ public class AnalysisManager implements Writable { tbl = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl()); } long tblId = tbl == null ? -1 : tbl.getId(); - return analysisInfos.stream() + synchronized (analysisInfos) { + return analysisInfos.stream() .filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId()) .filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state))) .filter(a -> tblName == null || a.tblId == tblId) .filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM) - || !stmt.isAuto() && a.jobType.equals(JobType.MANUAL)) + || !stmt.isAuto() && a.jobType.equals(JobType.MANUAL)) .sorted(Comparator.comparingLong(a -> a.jobId)) .collect(Collectors.toList()); + } } public String getJobProgress(long jobId) { @@ -796,31 +798,39 @@ public class AnalysisManager implements Writable { } public void replayCreateAnalysisJob(AnalysisInfo jobInfo) { - while (analysisJobInfoMap.size() >= Config.analyze_record_limit) { - analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey()); - } - if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { - jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); + synchronized (analysisJobInfoMap) { + while (analysisJobInfoMap.size() >= Config.analyze_record_limit) { + analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey()); + } + if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { + jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); + } + this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo); } - this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo); } public void replayCreateAnalysisTask(AnalysisInfo taskInfo) { - while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) { - analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey()); - } - if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { - taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); + synchronized (analysisTaskInfoMap) { + while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) { + analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey()); + } + if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { + taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); + } + this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo); } - this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo); } public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) { - this.analysisJobInfoMap.remove(log.id); + synchronized (analysisJobInfoMap) { + this.analysisJobInfoMap.remove(log.id); + } } public void replayDeleteAnalysisTask(AnalyzeDeletionLog log) { - this.analysisTaskInfoMap.remove(log.id); + synchronized (analysisTaskInfoMap) { + this.analysisTaskInfoMap.remove(log.id); + } } private static class SyncTaskCollection { @@ -892,8 +902,10 @@ public class AnalysisManager implements Writable { } public void removeAll(List<AnalysisInfo> analysisInfos) { - for (AnalysisInfo analysisInfo : analysisInfos) { - analysisTaskInfoMap.remove(analysisInfo.taskId); + synchronized (analysisTaskInfoMap) { + for (AnalysisInfo analysisInfo : analysisInfos) { + analysisTaskInfoMap.remove(analysisInfo.taskId); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org