yiguolei commented on code in PR #33690: URL: https://github.com/apache/doris/pull/33690#discussion_r1689204876
########## fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java: ########## @@ -591,9 +540,351 @@ public void cleanProfile() { writeLock.lock(); try { queryIdToProfileMap.clear(); - queryIdDeque.clear(); + queryIdToExecutionProfiles.clear(); } finally { writeLock.unlock(); } } + + @Override + protected void runAfterCatalogReady() { + loadProfilesFromStorageIfFirstTime(); + writeProfileToStorage(); + deleteBrokenProfiles(); + deleteOutdatedProfilesFromStorage(); + } + + // List PROFILE_STORAGE_PATH and return all dir names + // string will contain profile id and its storage timestamp + private List<String> getOnStorageProfileInfos() { + List<String> res = Lists.newArrayList(); + try { + File profileDir = new File(PROFILE_STORAGE_PATH); + if (!profileDir.exists()) { + LOG.warn("Profile storage directory {} does not exist", PROFILE_STORAGE_PATH); + return res; + } + + File[] files = profileDir.listFiles(); + for (File file : files) { + res.add(file.getAbsolutePath()); + } + } catch (Exception e) { + LOG.error("Failed to get profile meta from storage", e); + } + + return res; + } + + // read profile file on storage + // deserialize to an object Profile + // push them to memory structure of ProfileManager for index + private void loadProfilesFromStorageIfFirstTime() { + if (this.isProfileLoaded) { + return; + } + + try { + LOG.info("Reading profile from {}", PROFILE_STORAGE_PATH); + List<String> profileDirAbsPaths = getOnStorageProfileInfos(); + // Thread safe list + List<Profile> profiles = Lists.newCopyOnWriteArrayList(); + + List<Thread> profileReadThreads = Lists.newArrayList(); + + for (String profileDirAbsPath : profileDirAbsPaths) { + Thread thread = new Thread(() -> { + Profile profile = Profile.read(profileDirAbsPath); + if (profile != null) { + profiles.add(profile); + } + }); + thread.start(); + profileReadThreads.add(thread); + } + + for (Thread thread : profileReadThreads) { + thread.join(); + } + + LOG.info("There are {} profiles loaded into memory", profiles.size()); + + // there may already has some queries running before this thread running + // so we should not clear current memory structure + + for (Profile profile : profiles) { + this.pushProfile(profile); + } + + this.isProfileLoaded = true; + } catch (Exception e) { + LOG.error("Failed to load query profile from storage", e); + } + } + + private void createProfileStorageDirIfNecessary() { + File profileDir = new File(PROFILE_STORAGE_PATH); + if (profileDir.exists()) { + return; + } + + // create query_id directory + if (!profileDir.mkdir()) { + LOG.warn("create profile directory {} failed", profileDir.getAbsolutePath()); + } else { + LOG.info("Create profile storage {} succeed", PROFILE_STORAGE_PATH); + } + } + + private List<ProfileElement> getProfilesNeedStore() { + List<ProfileElement> profilesToBeStored = Lists.newArrayList(); + + queryIdToProfileMap.forEach((queryId, profileElement) -> { + if (profileElement.profile.shouldStoreToStorage()) { + profilesToBeStored.add(profileElement); + } + }); + + return profilesToBeStored; + } + + // Collect profiles that need to be stored to storage + // Store them to storage + // Release the memory + private void writeProfileToStorage() { + try { + if (Strings.isNullOrEmpty(PROFILE_STORAGE_PATH)) { + LOG.error("Logical error, PROFILE_STORAGE_PATH is empty"); + return; + } + + createProfileStorageDirIfNecessary(); + List<ProfileElement> profilesToBeStored = Lists.newArrayList(); + + readLock.lock(); + try { + profilesToBeStored = getProfilesNeedStore(); + } finally { + readLock.unlock(); + } + + // Store profile to storage in parallel + List<Thread> iothreads = Lists.newArrayList(); + + for (ProfileElement profileElement : profilesToBeStored) { + Thread thread = new Thread(() -> { + profileElement.writeToStorage(PROFILE_STORAGE_PATH); + }); + iothreads.add(thread); + thread.start(); + } + + for (Thread thread : iothreads) { + thread.join(); + } + + // After profile is stored to storage, the executoin profile must be ejected from memory + // or the memory will be exhausted + + writeLock.lock(); + try { + for (ProfileElement profileElement : profilesToBeStored) { + for (ExecutionProfile executionProfile : profileElement.profile.getExecutionProfiles()) { + this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId()); + } + profileElement.profile.releaseExecutionProfile(); + } + } finally { + writeLock.unlock(); + } + } catch (Exception e) { + LOG.error("Failed to remove query profile", e); + } + } + + private List<ProfileElement> getProfilesToBeRemoved() { + final int maxProfilesOnStorage = Config.max_spilled_profile_num; + // By order of query finish timestamp + // The profile with the least storage timestamp will be on the top of heap + PriorityQueue<ProfileElement> profileDeque = new PriorityQueue<>(Comparator.comparingLong( + (ProfileElement profileElement) -> profileElement.profile.getQueryFinishTimestamp())); + + // Collect all profiles that has been stored to storage + queryIdToProfileMap.forEach((queryId, profileElement) -> { + if (profileElement.profile.shouldStoreToStorage()) { + profileDeque.add(profileElement); + } + }); + + List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList(); + + while (profileDeque.size() > maxProfilesOnStorage) { + // First profile is the oldest profile + queryIdToBeRemoved.add(profileDeque.poll()); + } + + return queryIdToBeRemoved; + } + + // We can not store all profiles on storage, because the storage space is limited + // So we need to remove the outdated profiles + private void deleteOutdatedProfilesFromStorage() { + try { + List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList(); + readLock.lock(); + try { + queryIdToBeRemoved = getProfilesToBeRemoved(); + } finally { + readLock.unlock(); + } + + List<Thread> iothreads = Lists.newArrayList(); + + for (ProfileElement profileElement : queryIdToBeRemoved) { + Thread thread = new Thread(() -> { + profileElement.deleteFromStorage(); + }); + thread.start(); + iothreads.add(thread); + } + + try { + for (Thread thread : iothreads) { + thread.join(); + } + } catch (InterruptedException e) { + LOG.error("Failed to remove outdated query profile", e); + } + + writeLock.lock(); + try { + for (ProfileElement profileElement : queryIdToBeRemoved) { + queryIdToProfileMap.remove(profileElement.profile.getSummaryProfile().getProfileId()); + TUniqueId thriftQueryId = DebugUtil.parseTUniqueIdFromString( + profileElement.profile.getSummaryProfile().getProfileId()); + queryIdToExecutionProfiles.remove(thriftQueryId); + } + } finally { + writeLock.unlock(); + } + + if (queryIdToBeRemoved.size() != 0 && LOG.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + for (ProfileElement profileElement : queryIdToBeRemoved) { + builder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(","); + } + LOG.debug("Remove outdated profile: {}", builder.toString()); + } + } catch (Exception e) { + LOG.error("Failed to remove outdated query profile", e); + } + } + + private List<String> getBrokenProfiles() { + List<String> profilesOnStorage = getOnStorageProfileInfos(); + List<String> brokenProfiles = Lists.newArrayList(); + + for (String profileDirAbsPath : profilesOnStorage) { + int separatorIdx = profileDirAbsPath.lastIndexOf(File.separator); + if (separatorIdx == -1) { + LOG.warn("Invalid profile path {}", profileDirAbsPath); + brokenProfiles.add(profileDirAbsPath); + continue; + } + + String profileId = ""; + + try { + String timeStampAndId = profileDirAbsPath.substring(separatorIdx + 1); + String[] parsed = Profile.parseProfileFileName(timeStampAndId); + if (parsed == null) { + LOG.warn("Invalid profile directory path: {}", profileDirAbsPath); + brokenProfiles.add(profileDirAbsPath); + continue; + } else { + profileId = parsed[1]; + } + } catch (Exception e) { + LOG.error("Failed to get profile id from path: {}", profileDirAbsPath, e); + brokenProfiles.add(profileDirAbsPath); + continue; + } + + readLock.lock(); + try { + if (!queryIdToProfileMap.containsKey(profileId)) { + LOG.debug("Wild profile {}, need to be removed.", profileDirAbsPath); + brokenProfiles.add(profileDirAbsPath); + } + } finally { + readLock.unlock(); + } + } + + return brokenProfiles; + } + + private void deleteBrokenProfiles() { + List<String> brokenProfiles = getBrokenProfiles(); + List<Thread> iothreads = Lists.newArrayList(); + + for (String brokenProfile : brokenProfiles) { + Thread iothread = new Thread(() -> { + try { + File profileFile = new File(brokenProfile); + if (!profileFile.isFile()) { + LOG.warn("Profile path {} is not a file, can not delete.", brokenProfile); + return; + } + + FileUtils.deleteQuietly(profileFile); + LOG.debug("Delete broken profile: {}", brokenProfile); + } catch (Exception e) { + LOG.error("Failed to delete broken profile: {}", brokenProfile, e); + } + }); + iothread.start(); + iothreads.add(iothread); + } + + for (Thread iothread : iothreads) { + try { + iothread.join(); + } catch (InterruptedException e) { + LOG.error("Failed to remove broken profile", e); + } + } + } + + // The init value of query finish time of profile is MAX_VALUE + // So more recent query will be on the top of heap. + private PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTime() { + PriorityQueue<ProfileElement> queryIdDeque = new PriorityQueue<>(Comparator.comparingLong( + (ProfileElement profileElement) -> profileElement.profile.getQueryFinishTimestamp()).reversed()); + + queryIdToProfileMap.forEach((queryId, profileElement) -> { + queryIdDeque.add(profileElement); + }); + + return queryIdDeque; + } + + // When the query is finished, the execution profile should be marked as finished + // For load task, one of its execution profile is finished. + public void markQueryFinished(TUniqueId queryId) { + readLock.lock(); + try { + ExecutionProfile execProfile = queryIdToExecutionProfiles.get(queryId); + if (execProfile == null) { Review Comment: 一个profile 对应多个execution profile,这里不要从profilemanager 里读取了,直接profile 对象里就包含多个execution profile,直接都设置了吧 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org