This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 939138e0411 [test](profile) More unit test for profile on FE (#47623) 939138e0411 is described below commit 939138e041170e029a96a42f334538e54ef70c10 Author: zhiqiang <hezhiqi...@selectdb.com> AuthorDate: Wed Feb 26 22:00:30 2025 +0800 [test](profile) More unit test for profile on FE (#47623) --- be/src/pipeline/exec/operator.cpp | 2 - be/src/util/runtime_profile.cpp | 7 +- be/src/util/runtime_profile.h | 9 - .../org/apache/doris/common/profile/Profile.java | 4 +- .../doris/common/profile/ProfileManager.java | 33 +- .../doris/common/profile/RuntimeProfile.java | 42 +- .../doris/common/profile/ProfileManagerTest.java | 437 +++++++++++++++++++++ .../common/profile/RuntimeProfileMergeTest.java | 296 ++++++++++++++ .../doris/common/profile/RuntimeProfileTest.java | 18 +- gensrc/thrift/RuntimeProfile.thrift | 3 +- 10 files changed, 776 insertions(+), 75 deletions(-) diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 8407b39e7f5..443f49b572f 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -464,7 +464,6 @@ template <typename SharedStateArg> Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) { _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix())); _runtime_profile->set_metadata(_parent->node_id()); - _runtime_profile->set_is_sink(false); // indent is false so that source operator will have same // indentation_level with its parent operator. info.parent_profile->add_child(_runtime_profile.get(), /*indent=*/false, nullptr); @@ -540,7 +539,6 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink // create profile _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + name_suffix())); _profile->set_metadata(_parent->node_id()); - _profile->set_is_sink(true); _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>; if constexpr (!is_fake_shared) { diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp index 84010070f4f..9b8356aa7f7 100644 --- a/be/src/util/runtime_profile.cpp +++ b/be/src/util/runtime_profile.cpp @@ -300,9 +300,7 @@ RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool inden if (this->is_set_metadata()) { child->set_metadata(this->metadata()); } - if (this->is_set_sink()) { - child->set_is_sink(this->is_sink()); - } + if (_children.empty()) { add_child_unlock(child, indent, nullptr); } else { @@ -558,9 +556,6 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes, int64 pr node.metadata = _metadata; node.timestamp = _timestamp; node.indent = true; - if (this->is_set_sink()) { - node.__set_is_sink(this->is_sink()); - } { std::lock_guard<std::mutex> l(_counter_map_lock); diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index bbba622f6c8..359b0cb189c 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -422,15 +422,6 @@ public: bool is_set_metadata() const { return _is_set_metadata; } - void set_is_sink(bool is_sink) { - _is_set_sink = true; - _is_sink = is_sink; - } - - bool is_sink() const { return _is_sink; } - - bool is_set_sink() const { return _is_set_sink; } - time_t timestamp() const { return _timestamp; } void set_timestamp(time_t ss) { _timestamp = ss; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index c99e602c24e..e0922b1526d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -86,7 +86,7 @@ public class Profile { private SummaryProfile summaryProfile = new SummaryProfile(); // executionProfiles will be stored to storage as text, when getting profile content, we will read // from storage directly. - private List<ExecutionProfile> executionProfiles = Lists.newArrayList(); + List<ExecutionProfile> executionProfiles = Lists.newArrayList(); // profileStoragePath will only be assigned when: // 1. profile is stored to storage // 2. or profile is loaded from storage @@ -94,7 +94,7 @@ public class Profile { // isQueryFinished means the coordinator or stmt executor is finished. // does not mean the profile report has finished, since the report is async. // finish of collection of profile is marked by isCompleted of ExecutionProfiles. - private boolean isQueryFinished = false; + boolean isQueryFinished = false; // when coordinator finishes, it will mark finish time. // we will wait for about 5 seconds to see if all profiles have been reported. // if not, we will store the profile to storage, and release the memory, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java index f174b6b7dcc..63380d082f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MasterDaemon; @@ -71,7 +70,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; public class ProfileManager extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(ProfileManager.class); private static volatile ProfileManager INSTANCE = null; - private static final String PROFILE_STORAGE_PATH = Config.spilled_profile_storage_path; + static String PROFILE_STORAGE_PATH = Config.spilled_profile_storage_path; public enum ProfileType { QUERY, @@ -131,10 +130,10 @@ public class ProfileManager extends MasterDaemon { // profile id is long string for broker load // is TUniqueId for others. - private Map<String, ProfileElement> queryIdToProfileMap; + final Map<String, ProfileElement> queryIdToProfileMap; // Sometimes one Profile is related with multiple execution profiles(Broker-load), so that // execution profile's query id is not related with Profile's query id. - private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles; + final Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles; private final ExecutorService fetchRealTimeProfileExecutor; private final ExecutorService profileIOExecutor; @@ -168,9 +167,9 @@ public class ProfileManager extends MasterDaemon { private ProfileElement createElement(Profile profile) { ProfileElement element = new ProfileElement(profile); element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings()); - // Not init builder any more, we will not maintain it since 2.1.0, because the structure + // Not init builder anymore, we will not maintain it since 2.1.0, because the structure // assume that the execution profiles structure is already known before execution. But in - // PipelineX Engine, it will changed during execution. + // PipelineX Engine, it will be changed during execution. return element; } @@ -421,10 +420,6 @@ public class ProfileManager extends MasterDaemon { /** * Check if the query with specific query id is queried by specific user. - * - * @param user - * @param queryId - * @throws DdlException */ public void checkAuthByUserAndQueryId(String user, String queryId) throws AuthenticationException { readLock.lock(); @@ -484,7 +479,7 @@ public class ProfileManager extends MasterDaemon { // List PROFILE_STORAGE_PATH and return all dir names // string will contain profile id and its storage timestamp - private List<String> getOnStorageProfileInfos() { + List<String> getOnStorageProfileInfos() { List<String> res = Lists.newArrayList(); try { File profileDir = new File(PROFILE_STORAGE_PATH); @@ -509,7 +504,7 @@ public class ProfileManager extends MasterDaemon { // read profile file on storage // deserialize to an object Profile // push them to memory structure of ProfileManager for index - private void loadProfilesFromStorageIfFirstTime() { + void loadProfilesFromStorageIfFirstTime() { if (this.isProfileLoaded) { return; } @@ -556,7 +551,7 @@ public class ProfileManager extends MasterDaemon { } } - private void createProfileStorageDirIfNecessary() { + void createProfileStorageDirIfNecessary() { File profileDir = new File(PROFILE_STORAGE_PATH); if (profileDir.exists()) { return; @@ -570,7 +565,7 @@ public class ProfileManager extends MasterDaemon { } } - private List<ProfileElement> getProfilesNeedStore() { + List<ProfileElement> getProfilesNeedStore() { List<ProfileElement> profilesToBeStored = Lists.newArrayList(); queryIdToProfileMap.forEach((queryId, profileElement) -> { @@ -585,7 +580,7 @@ public class ProfileManager extends MasterDaemon { // Collect profiles that need to be stored to storage // Store them to storage // Release the memory - private void writeProfileToStorage() { + void writeProfileToStorage() { try { if (Strings.isNullOrEmpty(PROFILE_STORAGE_PATH)) { LOG.error("Logical error, PROFILE_STORAGE_PATH is empty"); @@ -639,7 +634,7 @@ public class ProfileManager extends MasterDaemon { } } - private List<ProfileElement> getProfilesToBeRemoved() { + List<ProfileElement> getProfilesToBeRemoved() { // By order of query finish timestamp // The profile with the least storage timestamp will be on the top of heap PriorityQueue<ProfileElement> profileDeque = new PriorityQueue<>(Comparator.comparingLong( @@ -671,7 +666,7 @@ public class ProfileManager extends MasterDaemon { // We can not store all profiles on storage, because the storage space is limited // So we need to remove the outdated profiles - private void deleteOutdatedProfilesFromStorage() { + void deleteOutdatedProfilesFromStorage() { try { List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList(); readLock.lock(); @@ -723,7 +718,7 @@ public class ProfileManager extends MasterDaemon { } } - private List<String> getBrokenProfiles() { + List<String> getBrokenProfiles() { List<String> profilesOnStorage = getOnStorageProfileInfos(); List<String> brokenProfiles = Lists.newArrayList(); @@ -767,7 +762,7 @@ public class ProfileManager extends MasterDaemon { return brokenProfiles; } - private void deleteBrokenProfiles() { + void deleteBrokenProfiles() { List<String> brokenProfiles = getBrokenProfiles(); List<Future<?>> profileDeleteFutures = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java index 08f5122b768..d3321d7de6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java @@ -265,14 +265,13 @@ public class RuntimeProfile { if (node.isSetMetadata()) { this.nodeid = (int) node.getMetadata(); } - if (node.isSetIsSink()) { - this.isSinkOperator = node.is_sink; - } + Preconditions.checkState(timestamp == -1 || node.timestamp != -1); // update this level's counters if (node.counters != null) { for (TCounter tcounter : node.counters) { - Counter counter = counterMap.get(tcounter.name); + // If different node has counter with the same name, it will lead to chaos. + Counter counter = this.counterMap.get(tcounter.name); if (counter == null) { counterMap.put(tcounter.name, new Counter(tcounter.type, tcounter.value, tcounter.level)); } else { @@ -460,10 +459,10 @@ public class RuntimeProfile { if (planNodeInfos.isEmpty()) { return; } - builder.append(prefix + "- " + "PlanInfo\n"); + builder.append(prefix).append("- ").append("PlanInfo\n"); for (String info : planNodeInfos) { - builder.append(prefix + " - " + info + "\n"); + builder.append(prefix).append(" - ").append(info).append("\n"); } } @@ -497,6 +496,7 @@ public class RuntimeProfile { RuntimeProfile templateProfile = profiles.get(0); for (int i = 0; i < templateProfile.childList.size(); i++) { RuntimeProfile templateChildProfile = templateProfile.childList.get(i).first; + // Traverse all profiles and get the child profile with the same name List<RuntimeProfile> allChilds = getChildListFromLists(templateChildProfile.name, profiles); RuntimeProfile newCreatedMergedChildProfile = new RuntimeProfile(templateChildProfile.name, templateChildProfile.nodeId()); @@ -510,7 +510,7 @@ public class RuntimeProfile { } } - private static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles, + static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles, RuntimeProfile simpleProfile) { if (profiles.isEmpty()) { return; @@ -540,7 +540,7 @@ public class RuntimeProfile { Counter oldCounter = templateCounterMap.get(childCounterName); AggCounter aggCounter = new AggCounter(oldCounter.getType()); for (RuntimeProfile profile : profiles) { - // orgCounter could be null if counter structure is changed + // orgCounter could be null if counter-structure is changed // change of counter structure happens when NonZeroCounter is involved. // So here we have to ignore the counter if it is not found in the profile. Counter orgCounter = profile.counterMap.get(childCounterName); @@ -699,7 +699,7 @@ public class RuntimeProfile { } finally { childLock.writeLock().unlock(); } - // insert plan node info to profile strinfo + // insert plan node info to profile string info if (planNodeMap == null || !planNodeMap.containsKey(child.nodeId())) { return; } @@ -723,30 +723,6 @@ public class RuntimeProfile { } } - public void addFirstChild(RuntimeProfile child) { - if (child == null) { - return; - } - childLock.writeLock().lock(); - try { - if (childMap.containsKey(child.name)) { - childList.removeIf(e -> e.first.name.equals(child.name)); - } - this.childMap.put(child.name, child); - Pair<RuntimeProfile, Boolean> pair = Pair.of(child, true); - this.childList.addFirst(pair); - } finally { - childLock.writeLock().unlock(); - } - } - - // Because the profile of summary and child fragment is not a real parent-child - // relationship - // Each child profile needs to calculate the time proportion consumed by itself - public void computeTimeInChildProfile() { - childMap.values().forEach(RuntimeProfile::computeTimeInProfile); - } - public void computeTimeInProfile() { computeTimeInProfile(this.counterTotalTime.getValue()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java index d0d17505861..dc979168b67 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java @@ -17,6 +17,14 @@ package org.apache.doris.common.profile; +import org.apache.doris.common.Config; +import org.apache.doris.common.profile.ProfileManager.ProfileElement; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Assertions; @@ -24,12 +32,16 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; class ProfileManagerTest { @@ -274,4 +286,429 @@ class ProfileManagerTest { } } } + + @Test + void cleanProfileTest() { + // Create and push profile + final int normalProfiles = 100; + for (int i = 0; i < normalProfiles; i++) { + Profile profile = constructProfile(String.valueOf(i)); + Random random = new Random(); + profile.setQueryFinishTimestamp(random.nextInt(200 - 101) + 101); + // set query start time in range of [0, 1000) + profile.getSummaryProfile().setQueryBeginTime(random.nextInt(100)); + profileManager.pushProfile(profile); + } + // Clean profile + profileManager.cleanProfile(); + // Make sure map is cleaned. + Assertions.assertTrue(profileManager.queryIdToProfileMap.isEmpty()); + Assertions.assertTrue(profileManager.queryIdToExecutionProfiles.isEmpty()); + } + + @Test + void addExecutionProfileTest() { + final int normalProfiles = 100; + for (int i = 0; i < normalProfiles; i++) { + Profile profile = constructProfile(String.valueOf(i)); + Random random = new Random(); + profile.setQueryFinishTimestamp(random.nextInt(200 - 101) + 101); + profile.getSummaryProfile().setQueryBeginTime(random.nextInt(100)); + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + List<Integer> fragments = new ArrayList<>(); + ExecutionProfile executionProfile = new ExecutionProfile(queryId, fragments); + profile.addExecutionProfile(executionProfile); + if (i == normalProfiles - 1) { + profileManager.addExecutionProfile(null); + } else { + for (ExecutionProfile executionProfileTemp : profile.getExecutionProfiles()) { + profileManager.addExecutionProfile(executionProfileTemp); + } + } + } + + Assertions.assertEquals(normalProfiles - 1, profileManager.queryIdToExecutionProfiles.size()); + } + + @Test + void getOnStorageProfileInfosTest() throws Exception { + // Create a temporary directory for profile storage + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + try { + // Override config path to use temp dir + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create some test profile files + for (int i = 0; i < 3; i++) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + File profileFile = new File(tempDir, System.currentTimeMillis() + '_' + DebugUtil.printId(queryId)); + profileFile.createNewFile(); + } + + // Get profiles from storage + List<String> profiles = profileManager.getOnStorageProfileInfos(); + + // Verify result + Assertions.assertEquals(3, profiles.size()); + for (String profile : profiles) { + Assertions.assertTrue(profile.startsWith(tempDir.getAbsolutePath())); + } + } finally { + // Restore original path + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + // Cleanup temp files + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + void testLoadProfile() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + try { + profileManager.isProfileLoaded = false; + // Override config path to use temp dir + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create some test profile files + for (int i = 0; i < 30; i++) { + // Sleep 200 ms, so that query finish time is different. + Thread.sleep(200); + Profile profile = ProfilePersistentTest.constructRandomProfile(1); + profile.writeToStorage(ProfileManager.PROFILE_STORAGE_PATH); + } + + // Get profiles from storage + profileManager.loadProfilesFromStorageIfFirstTime(); + Assertions.assertTrue(profileManager.isProfileLoaded); + // Verify result + Assertions.assertEquals(30, profileManager.queryIdToProfileMap.size()); + Assertions.assertEquals(0, profileManager.queryIdToExecutionProfiles.size()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + // Restore original path + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + // Cleanup temp files + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + void testGetProfilesNeedStore() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + try { + profileManager.isProfileLoaded = false; + // Override config path to use temp dir + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create some test profile files + for (int i = 0; i < 30; i++) { + // Sleep 200 ms, so that query finish time is different. + Thread.sleep(100); + Profile profile = ProfilePersistentTest.constructRandomProfile(1); + profile.isQueryFinished = true; + profile.setQueryFinishTimestamp(System.currentTimeMillis()); + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + List<Integer> fragments = new ArrayList<>(); + profile.addExecutionProfile(new ExecutionProfile(queryId, fragments)); + if (i % 2 == 0) { + new Expectations(profile) { + { + profile.shouldStoreToStorage(); + result = true; + } + }; + } else { + new Expectations(profile) { + { + profile.shouldStoreToStorage(); + result = false; + } + }; + } + profileManager.pushProfile(profile); + } + + List<ProfileElement> profiles = profileManager.getProfilesNeedStore(); + + // Verify result + Assertions.assertEquals(30 / 2, profiles.size()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + // Restore original path + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + // Cleanup temp files + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + void testWriteProfileToStorage() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + try { + profileManager.isProfileLoaded = false; + // Override config path to use temp dir + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create some test profile files + for (int i = 0; i < 30; i++) { + // Sleep 200 ms, so that query finish time is different. + Thread.sleep(100); + Profile profile = ProfilePersistentTest.constructRandomProfile(1); + profile.isQueryFinished = true; + profile.setQueryFinishTimestamp(System.currentTimeMillis()); + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + List<Integer> fragments = new ArrayList<>(); + profile.addExecutionProfile(new ExecutionProfile(queryId, fragments)); + for (ExecutionProfile executionProfile : profile.getExecutionProfiles()) { + profileManager.addExecutionProfile(executionProfile); + } + + // Make sure all profile is released + new Expectations(profile) { + { + profile.shouldStoreToStorage(); + result = true; + profile.releaseMemory(); + times = 1; + } + }; + + profileManager.pushProfile(profile); + } + + profileManager.writeProfileToStorage(); + + // Verify result + File[] files = tempDir.listFiles(); + assert files != null; + Assertions.assertEquals(30, files.length); + Assertions.assertEquals(30, profileManager.queryIdToProfileMap.size()); + Assertions.assertEquals(0, profileManager.queryIdToExecutionProfiles.size()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + // Restore original path + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + // Cleanup temp files + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + void testGetProfilesToBeRemoved() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + int originMaxSpilledProfileNum = Config.max_spilled_profile_num; + + try { + Config.max_spilled_profile_num = 10; + // Override config path to use temp dir + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create some test profile files + for (int i = 0; i < 30; i++) { + // Sleep 200 ms, so that query finish time is different. + Thread.sleep(100); + Profile profile = ProfilePersistentTest.constructRandomProfile(1); + profile.isQueryFinished = true; + profile.setQueryFinishTimestamp(System.currentTimeMillis()); + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + List<Integer> fragments = new ArrayList<>(); + profile.addExecutionProfile(new ExecutionProfile(queryId, fragments)); + for (ExecutionProfile executionProfile : profile.getExecutionProfiles()) { + profileManager.addExecutionProfile(executionProfile); + } + new Expectations(profile) { + { + profile.profileHasBeenStored(); + result = true; + } + }; + + profileManager.pushProfile(profile); + } + + List<ProfileElement> remove = profileManager.getProfilesToBeRemoved(); + + // Verify result + Assertions.assertEquals(remove.size(), 30 - Config.max_spilled_profile_num); + PriorityQueue<ProfileElement> notRemove = profileManager.getProfileOrderByQueryFinishTimeDesc(); + List<ProfileElement> notRemove2 = Lists.newArrayList(); + for (int i = 0; i < Config.max_spilled_profile_num; i++) { + notRemove2.add(notRemove.poll()); + } + + for (ProfileElement profileElement : notRemove2) { + long timestamp = profileElement.profile.getQueryFinishTimestamp(); + for (ProfileElement removeProfile : remove) { + // Make sure timestamp is larger than all removed profile. + Assertions.assertTrue(timestamp > removeProfile.profile.getQueryFinishTimestamp()); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + // Restore original path + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + // Cleanup temp files + FileUtils.deleteDirectory(tempDir); + Config.max_spilled_profile_num = originMaxSpilledProfileNum; + } + } + + + @Test + void testDeleteOutdatedProfilesFromStorage() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + int originMaxSpilledProfileNum = Config.max_spilled_profile_num; + + try { + Config.max_spilled_profile_num = 10; + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create test profiles + for (int i = 0; i < 30; i++) { + Thread.sleep(100); + Profile profile = ProfilePersistentTest.constructRandomProfile(1); + profile.isQueryFinished = true; + profile.setQueryFinishTimestamp(System.currentTimeMillis()); + int finalI = i; + new Expectations(profile) { + { + profile.profileHasBeenStored(); + result = true; + profile.deleteFromStorage(); + times = finalI < 20 ? 1 : 0; // First 20 should be deleted + } + }; + + profileManager.pushProfile(profile); + } + + // Execute deletion + profileManager.deleteOutdatedProfilesFromStorage(); + + // Verify correct profiles were deleted + Assertions.assertEquals(10, profileManager.queryIdToProfileMap.size()); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + Config.max_spilled_profile_num = originMaxSpilledProfileNum; + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + void testGetBrokenProfiles() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + + try { + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create normal profiles + for (int i = 0; i < 3; i++) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + String profileId = DebugUtil.printId(queryId); + + // Create profile in memory + Profile profile = constructProfile(profileId); + profileManager.pushProfile(profile); + + // Create profile file + File profileFile = new File(tempDir, System.currentTimeMillis() + "_" + profileId); + profileFile.createNewFile(); + } + + // Create broken profiles (no corresponding memory entry) + for (int i = 0; i < 2; i++) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + File brokenFile = new File(tempDir, System.currentTimeMillis() + "_" + DebugUtil.printId(queryId)); + brokenFile.createNewFile(); + } + + // Get broken profiles + List<String> brokenProfiles = profileManager.getBrokenProfiles(); + + // Verify result - should find 2 broken profiles + Assertions.assertEquals(2, brokenProfiles.size()); + for (String profile : brokenProfiles) { + Assertions.assertTrue(profile.startsWith(tempDir.getAbsolutePath())); + } + + } finally { + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + FileUtils.deleteDirectory(tempDir); + } + } + + @Test + void testDeleteBrokenProfiles() throws IOException { + File tempDir = Files.createTempDirectory("profile_test").toFile(); + String originalPath = ProfileManager.PROFILE_STORAGE_PATH; + + try { + ProfileManager.PROFILE_STORAGE_PATH = tempDir.getAbsolutePath(); + + // Create normal and broken profile files + List<File> normalFiles = new ArrayList<>(); + List<File> brokenFiles = new ArrayList<>(); + + // Create normal profiles with memory entries + for (int i = 0; i < 3; i++) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + String profileId = DebugUtil.printId(queryId); + + Profile profile = constructProfile(profileId); + profileManager.pushProfile(profile); + + File normalFile = new File(tempDir, System.currentTimeMillis() + "_" + profileId); + normalFile.createNewFile(); + normalFiles.add(normalFile); + } + + // Create broken profiles (no memory entries) + for (int i = 0; i < 2; i++) { + UUID taskId = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + File brokenFile = new File(tempDir, System.currentTimeMillis() + "_" + DebugUtil.printId(queryId)); + brokenFile.createNewFile(); + brokenFiles.add(brokenFile); + } + + // Delete broken profiles + profileManager.deleteBrokenProfiles(); + + // Verify normal files still exist + for (File file : normalFiles) { + Assertions.assertTrue(file.exists()); + } + + // Verify broken files were deleted + for (File file : brokenFiles) { + Assertions.assertFalse(file.exists()); + } + + } finally { + ProfileManager.PROFILE_STORAGE_PATH = originalPath; + FileUtils.deleteDirectory(tempDir); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileMergeTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileMergeTest.java new file mode 100644 index 00000000000..10b03155523 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileMergeTest.java @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.thrift.TCounter; +import org.apache.doris.thrift.TRuntimeProfileNode; +import org.apache.doris.thrift.TRuntimeProfileTree; +import org.apache.doris.thrift.TUnit; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Set; + +public class RuntimeProfileMergeTest { + private static final Logger LOG = LogManager.getLogger(RuntimeProfileMergeTest.class); + + @Test + public void testMergeCounter() { + RuntimeProfile profile1 = new RuntimeProfile("profile1"); + RuntimeProfile profile2 = new RuntimeProfile("profile2"); + + Counter counter1 = profile1.addCounter("Counter1", TUnit.UNIT, RuntimeProfile.ROOT_COUNTER); + Counter counter2 = profile1.addCounter("Counter2", TUnit.UNIT, RuntimeProfile.ROOT_COUNTER); + counter1.setValue(101); + counter2.setValue(102); + Counter counter1Child1 = profile1.addCounter("Counter1_Child1", TUnit.UNIT, "Counter1"); + Counter counter2Child1 = profile1.addCounter("Counter2_Child1", TUnit.UNIT, "Counter2"); + counter1Child1.setValue(1011); + counter2Child1.setValue(1021); + counter1.setLevel(1); + counter2.setLevel(1); + counter1Child1.setLevel(1); + counter2Child1.setLevel(1); + + profile2.addCounter("Counter1", counter1, RuntimeProfile.ROOT_COUNTER); + profile2.addCounter("Counter2", counter2, RuntimeProfile.ROOT_COUNTER); + + RuntimeProfile mergeProfile = new RuntimeProfile("mergeProfile"); + RuntimeProfile.mergeCounters(RuntimeProfile.ROOT_COUNTER, Lists.newArrayList(profile1, profile2), mergeProfile); + /* + mergeProfile: + - Counter1: sum 202, avg 101, max 101, min 101 + - Counter1_Child1: sum 1.011K (1011), avg 1.011K (1011), max 1.011K (1011), min 1.011K (1011) + - Counter2: sum 204, avg 102, max 102, min 102 + - Counter2_Child1: sum 1.021K (1021), avg 1.021K (1021), max 1.021K (1021), min 1.021K (1021) + * */ + LOG.info("Profile1:\n{}", mergeProfile.toString()); + + Assert.assertTrue(mergeProfile.getCounterMap().get("Counter1") instanceof AggCounter); + AggCounter aggCounter = (AggCounter) mergeProfile.getCounterMap().get("Counter1"); + Assert.assertEquals(aggCounter.sum.getValue(), 202); + Assert.assertEquals(aggCounter.number, 2); + } + + @Test + public void testMergeProfileNormal() { + TRuntimeProfileTree tRuntimeProfileTree = new TRuntimeProfileTree(); + TRuntimeProfileNode rootNode = new TRuntimeProfileNode(); + rootNode.name = "rootNode"; + rootNode.num_children = 2; + tRuntimeProfileTree.addToNodes(rootNode); + TRuntimeProfileNode node1 = new TRuntimeProfileNode(); + TRuntimeProfileNode node2 = new TRuntimeProfileNode(); + node1.name = "node1"; + node2.name = "node2"; + node1.num_children = 0; + node2.num_children = 0; + tRuntimeProfileTree.addToNodes(node1); + tRuntimeProfileTree.addToNodes(node2); + + node1.child_counters_map = new HashMap<>(); + node1.counters = new ArrayList<>(); + node2.child_counters_map = new HashMap<>(); + node2.counters = new ArrayList<>(); + + TCounter counter1 = new TCounter("Counter1", TUnit.UNIT, 1); + TCounter counter2 = new TCounter("Counter2", TUnit.UNIT, 1); + TCounter counter1Child = new TCounter("Counter1-Child1", TUnit.UNIT, 1); + TCounter counter2Child = new TCounter("Counter2-Child1", TUnit.UNIT, 1); + // Set counter-level to 1 so that the counter could be merged. + counter1.setLevel(1); + counter2.setLevel(1); + counter1Child.setLevel(1); + counter2Child.setLevel(1); + HashMap<String, Set<String>> childCountersMap = new HashMap<>(); + childCountersMap.put("", Sets.newHashSet("Counter1", "Counter2")); + childCountersMap.put("Counter1", Sets.newHashSet("Counter1-Child1")); + childCountersMap.put("Counter2", Sets.newHashSet("Counter2-Child1")); + + node1.counters.add(counter1); + node1.counters.add(counter2); + node1.counters.add(counter1Child); + node1.counters.add(counter2Child); + + node2.counters.add(counter1); + node2.counters.add(counter2); + node2.counters.add(counter1Child); + node2.counters.add(counter2Child); + + node1.child_counters_map = childCountersMap; + node2.child_counters_map = childCountersMap; + + // Each RuntimeProfile has a Counter named "TotalTime" after it is created. + RuntimeProfile profile1 = new RuntimeProfile("profile1"); + RuntimeProfile profile2 = new RuntimeProfile("profile2"); + RuntimeProfile profile3 = new RuntimeProfile("profile3"); + /* + profile1: + node1: + - Counter1: 1 + - Counter1-Child1: 1 + - Counter2: 1 + - Counter2-Child1: 1 + node2: + - Counter1: 1 + - Counter1-Child1: 1 + - Counter2: 1 + - Counter2-Child1: 1 + * */ + profile1.update(tRuntimeProfileTree); + profile2.update(tRuntimeProfileTree); + profile3.update(tRuntimeProfileTree); + + LOG.info("Profile1:\n{}", profile1.toString()); + + /* + * + mergedProfile: + node1: + - Counter1: sum 3, avg 1, max 1, min 1 + - Counter1-Child1: sum 3, avg 1, max 1, min 1 + - Counter2: sum 3, avg 1, max 1, min 1 + - Counter2-Child1: sum 3, avg 1, max 1, min 1 + node2: + - Counter1: sum 3, avg 1, max 1, min 1 + - Counter1-Child1: sum 3, avg 1, max 1, min 1 + - Counter2: sum 3, avg 1, max 1, min 1 + - Counter2-Child1: sum 3, avg 1, max 1, min 1 + * */ + RuntimeProfile mergedProfile = new RuntimeProfile("mergedProfile"); + RuntimeProfile.mergeProfiles(Lists.newArrayList(profile1, profile2, profile3), mergedProfile, null); + + StringBuilder builder = new StringBuilder(); + mergedProfile.prettyPrint(builder, "\t"); + LOG.info("Merged profile:\n{}", builder.toString()); + + Assert.assertEquals(mergedProfile.getChildList().size(), 2); + Assert.assertTrue( + mergedProfile.getChildList().get(0).first.getCounterMap().get("Counter1") instanceof AggCounter); + AggCounter aggCounterNode1 = (AggCounter) mergedProfile.getChildList().get(0).first.getCounterMap() + .get("Counter1"); + Assert.assertEquals(aggCounterNode1.sum.getValue(), 3); + Assert.assertEquals(aggCounterNode1.number, 3); + } + + // Test the case where counter of RuntimeProfile has different structure. + // When non-ZeroCounter is involved, counter-structure of RuntimeProfile is different. + @Test + public void testMergeProfileWithDifferentCounter() { + /* + profile1: + node1: + - Counter1: 1 + - Counter1-Child1: 1 + - Counter2: 1 + - Counter2-Child1: 1 + node2: + - Counter1: 1 + - Counter1-Child1: 1 + - Counter2: 1 + - Counter2-Child1: 1 + * */ + TRuntimeProfileTree tRuntimeProfileTree1 = new TRuntimeProfileTree(); + TRuntimeProfileNode rootNode = new TRuntimeProfileNode(); + rootNode.name = "rootNode"; + rootNode.num_children = 2; + tRuntimeProfileTree1.addToNodes(rootNode); + TRuntimeProfileNode node1 = new TRuntimeProfileNode(); + TRuntimeProfileNode node2 = new TRuntimeProfileNode(); + node1.name = "node1"; + node2.name = "node2"; + node1.num_children = 0; + node2.num_children = 0; + tRuntimeProfileTree1.addToNodes(node1); + tRuntimeProfileTree1.addToNodes(node2); + + node1.child_counters_map = new HashMap<>(); + node1.counters = new ArrayList<>(); + node2.child_counters_map = new HashMap<>(); + node2.counters = new ArrayList<>(); + + TCounter counter1 = new TCounter("Counter1", TUnit.UNIT, 1); + TCounter counter2 = new TCounter("Counter2", TUnit.UNIT, 1); + TCounter counter1Child = new TCounter("Counter1-Child1", TUnit.UNIT, 1); + TCounter counter2Child = new TCounter("Counter2-Child1", TUnit.UNIT, 1); + // Set counter-level to 1 so that the counter could be merged. + counter1.setLevel(1); + counter2.setLevel(1); + counter1Child.setLevel(1); + counter2Child.setLevel(1); + HashMap<String, Set<String>> childCountersMap = new HashMap<>(); + childCountersMap.put("", Sets.newHashSet("Counter1", "Counter2")); + childCountersMap.put("Counter1", Sets.newHashSet("Counter1-Child1")); + childCountersMap.put("Counter2", Sets.newHashSet("Counter2-Child1")); + + node1.counters.addAll(Lists.newArrayList(counter1, counter2, counter1Child, counter2Child)); + node1.child_counters_map = childCountersMap; + node2.counters.addAll(Lists.newArrayList(counter1, counter2, counter1Child, counter2Child)); + node2.child_counters_map = childCountersMap; + RuntimeProfile profile1 = new RuntimeProfile("profile1"); + profile1.update(tRuntimeProfileTree1); + + /* + profile1: + node1: + - Counter2: 1 + - Counter2-Child1: 1 + node2: + - Counter1: 1 + - Counter1-Child1: 1 + * */ + TRuntimeProfileTree tRuntimeProfileTree2 = new TRuntimeProfileTree(); + TRuntimeProfileNode rootNode2 = new TRuntimeProfileNode(); + rootNode2.name = "rootNode"; + rootNode2.num_children = 2; + tRuntimeProfileTree2.addToNodes(rootNode2); + + node1.counters.clear(); + node2.counters.clear(); + + node1.counters.addAll(Lists.newArrayList(counter2, counter2Child)); + node2.counters.addAll(Lists.newArrayList(counter1, counter1Child)); + + node1.child_counters_map = new HashMap<>(); + node1.child_counters_map.put("", Sets.newHashSet("Counter2")); + node1.child_counters_map.put("Counter2", Sets.newHashSet("Counter2-Child1")); + + node2.child_counters_map = new HashMap<>(); + node2.child_counters_map.put("", Sets.newHashSet("Counter1")); + node2.child_counters_map.put("Counter1", Sets.newHashSet("Counter1-Child1")); + + tRuntimeProfileTree2.addToNodes(node1); + tRuntimeProfileTree2.addToNodes(node2); + + RuntimeProfile profile2 = new RuntimeProfile("profile2"); + profile2.update(tRuntimeProfileTree2); + + // Let's merge them. + // Profile 1 and profile 2 have different counter-structure. + // But they can still do merge. + RuntimeProfile mergedProfile = new RuntimeProfile("mergedProfile"); + RuntimeProfile.mergeProfiles(Lists.newArrayList(profile1, profile2), mergedProfile, null); + + StringBuilder builder = new StringBuilder(); + mergedProfile.prettyPrint(builder, "\t"); + LOG.info("Merged profile:\n{}", builder.toString()); + + /* + * + mergedProfile: + node1: + - Counter1: sum 1, avg 1, max 1, min 1 + - Counter1-Child1: sum 1, avg 1, max 1, min 1 + - Counter2: sum 2, avg 1, max 1, min 1 + - Counter2-Child1: sum 2, avg 1, max 1, min 1 + node2: + - Counter1: sum 2, avg 1, max 1, min 1 + - Counter1-Child1: sum 2, avg 1, max 1, min 1 + - Counter2: sum 1, avg 1, max 1, min 1 + - Counter2-Child1: sum 1, avg 1, max 1, min 1 + * + * */ + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java index 38d168950e6..977a54db6d7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java @@ -25,6 +25,8 @@ import org.apache.doris.thrift.TUnit; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.junit.Test; @@ -34,6 +36,7 @@ import java.util.HashMap; import java.util.Set; public class RuntimeProfileTest { + private static final Logger LOG = LogManager.getLogger(RuntimeProfileTest.class); @Test public void testSortChildren() { @@ -110,9 +113,16 @@ public class RuntimeProfileTest { public void testUpdate() throws IOException { RuntimeProfile profile = new RuntimeProfile("REAL_ROOT"); /* the profile tree - * ROOT(time=5s info[key=value]) - * A(time=2s) B(time=1s info[BInfo1=BValu1;BInfo2=BValue2]) - * A_SON(time=10ms counter[counterA1=1; counterA2=2; counterA1Son=3]) + REAL_ROOT:(ExecTime: 3sec0ms) + - key: value + A:(ExecTime: 1sec0ms) + ASON:(ExecTime: 10.0ms) + - counterA1: 1 + - counterA1Son: 3 + - counterA2: 1.18 MB + B:(ExecTime: 1sec0ms) + - BInfo2: BValue2 + - BInfo1: BValue1 */ TRuntimeProfileTree tprofileTree = new TRuntimeProfileTree(); TRuntimeProfileNode tnodeRoot = new TRuntimeProfileNode(); @@ -171,5 +181,7 @@ public class RuntimeProfileTest { StringBuilder builder = new StringBuilder(); profile.computeTimeInProfile(); profile.prettyPrint(builder, ""); + + LOG.info("testUpdate profile:\n{}", builder.toString()); } } diff --git a/gensrc/thrift/RuntimeProfile.thrift b/gensrc/thrift/RuntimeProfile.thrift index dd4a936ffdb..764db39f7d2 100644 --- a/gensrc/thrift/RuntimeProfile.thrift +++ b/gensrc/thrift/RuntimeProfile.thrift @@ -54,7 +54,8 @@ struct TRuntimeProfileNode { 9: required i64 timestamp - 10: optional bool is_sink + // Deprecated. + 10: optional bool deprecated_is_sink } // A flattened tree of runtime profiles, obtained by an --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org