This is an automated email from the ASF dual-hosted git repository. hellostephen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 093e3a72064 [regression](test) check load channel and load stream before quit (#38293) 093e3a72064 is described below commit 093e3a72064b302d517b35f2a620c607c9a73a7b Author: Dongyang Li <hello_step...@qq.com> AuthorDate: Thu Jul 25 12:52:01 2024 +0800 [regression](test) check load channel and load stream before quit (#38293) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: stephen <hello-step...@qq.com> --- .../check_before_quit/check_before_quit.groovy | 92 +++++++++++++++++++--- 1 file changed, 80 insertions(+), 12 deletions(-) diff --git a/regression-test/suites/check_before_quit/check_before_quit.groovy b/regression-test/suites/check_before_quit/check_before_quit.groovy index b5b7803cb16..a09371b4c99 100644 --- a/regression-test/suites/check_before_quit/check_before_quit.groovy +++ b/regression-test/suites/check_before_quit/check_before_quit.groovy @@ -22,7 +22,8 @@ suite("check_before_quit", "nonConcurrent,p0") { List<Object> beRow = be_result.get(0) String beHost = beRow.get(1).toString().trim() String bePort = beRow.get(4).toString().trim() - logger.info("get be host and port:" + beHost + " " + bePort) + String beBrpcPort = beRow.get(5).toString().trim() + logger.info("get be host and port:" + beHost + " " + bePort + ", BrpcPort:" + beBrpcPort) //NOTE: this suite is used to check whether workload group's query queue works correctly when all query finished long beginTime = System.currentTimeMillis(); @@ -95,24 +96,55 @@ suite("check_before_quit", "nonConcurrent,p0") { } } return metricValues - } + } } + + def getVar = { String data, String name -> { + def varValues = [] + data.eachLine { line -> + line = line.trim() + + // Skip comment lines + if (line.startsWith('#')) return + + // Regular expression to match metric lines + def matcher = (line =~ /^(\w+)\s:\s(.+)$/) + + if (matcher) { + def varName = matcher[0][1] + def varValue = matcher[0][2] + if (varName == name) { + varValues << varValue + } + } + } + return varValues + } } - } beginTime = System.currentTimeMillis(); timeoutMs = 30 * 1000 // 30s clear = false - def command = "curl http://${beHost}:${bePort}/metrics" + def command_metrics = "curl http://${beHost}:${bePort}/metrics" + def command_vars = "curl http://${beHost}:${beBrpcPort}/vars" while ((System.currentTimeMillis() - beginTime) < timeoutMs) { clear = true - logger.info("executing command: ${command}") - def process = command.execute() - def outputStream = new StringBuffer() - def errorStream = new StringBuffer() - process.consumeProcessOutput(outputStream, errorStream) - def code = process.waitFor() - def metrics = outputStream.toString() - logger.info("Request BE metrics: code=" + code + ", err=" + errorStream.toString()) + logger.info("executing command: ${command_metrics}") + def process_metrics = command_metrics.execute() + def outputStream_metrics = new StringBuffer() + def errorStream_metrics = new StringBuffer() + process_metrics.consumeProcessOutput(outputStream_metrics, errorStream_metrics) + def code_metrics = process_metrics.waitFor() + def metrics = outputStream_metrics.toString() + logger.info("Request BE metrics: code=" + code_metrics + ", err=" + errorStream_metrics.toString()) + + logger.info("executing command: ${command_vars}") + def process_vars = command_vars.execute() + def outputStream_vars = new StringBuffer() + def errorStream_vars = new StringBuffer() + process_vars.consumeProcessOutput(outputStream_vars, errorStream_vars) + def code_vars = process_vars.waitFor() + def vars = outputStream_vars.toString() + logger.info("Request BE vars: code=" + code_vars + ", err=" + errorStream_vars.toString()) def hasSpillData = getPrometheusMetric(metrics, "doris_be_spill_disk_has_spill_data") logger.info("has spill temporary files :${hasSpillData}") @@ -141,6 +173,42 @@ suite("check_before_quit", "nonConcurrent,p0") { } } + def doris_be_load_channel_count = getPrometheusMetric(metrics, "doris_be_load_channel_count") + logger.info("doris_be_load_channel_count :${doris_be_load_channel_count}") + for (int i = 0; i < doris_be_load_channel_count.size(); i++) { + if (0 != Integer.valueOf(doris_be_load_channel_count.get(i))) { + clear = false; + break; + } + } + + def load_stream_count = getVar(vars, "load_stream_count") + logger.info("load_stream_count :${load_stream_count}") + for (int i = 0; i < load_stream_count.size(); i++) { + if (0 != Integer.valueOf(load_stream_count.get(i))) { + clear = false; + break; + } + } + + def load_stream_writer_count = getVar(vars, "load_stream_writer_count") + logger.info("load_stream_writer_count :${load_stream_writer_count}") + for (int i = 0; i < load_stream_writer_count.size(); i++) { + if (0 != Integer.valueOf(load_stream_writer_count.get(i))) { + clear = false; + break; + } + } + + def load_stream_file_writer_count = getVar(vars, "load_stream_file_writer_count") + logger.info("load_stream_file_writer_count :${load_stream_file_writer_count}") + for (int i = 0; i < load_stream_file_writer_count.size(); i++) { + if (0 != Integer.valueOf(load_stream_file_writer_count.get(i))) { + clear = false; + break; + } + } + if (clear) { break } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org