This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 04f00ee07e995a64163bd23dfe4751e93f8b091d Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon May 27 10:42:04 2024 +0800 [fix](load) fix wrong assert and cancel load error (#35352) 1. ``` java.lang.IllegalArgumentException: null at com.google.common.base.Preconditions.checkArgument(Preconditions.java:129) ~[guava-32.1.2-jre.jar:?] at org.apache.doris.qe.Coordinator.updateFragmentExecStatus(Coordinator.java:2590) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.qe.QeProcessorImpl.reportExecStatus(QeProcessorImpl.java:253) ~[doris-fe.jar:1.2-SNAPSHOT] at org.apache.doris.service.FrontendServiceImpl.reportExecStatus(FrontendServiceImpl.java:960) ~[doris-fe.jar:1.2-SNAPSHOT] at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_352-352] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_352-352] at org.apache.doris.service.FeServer.lambda$start$0(FeServer.java:60) ~[doris-fe.jar:1.2-SNAPSHOT] at com.sun.proxy.$Proxy26.reportExecStatus(Unknown Source) ~[?:?] at org.apache.doris.thrift.FrontendService$Processor$reportExecStatus.getResult(FrontendService.java:3632) ~[fe-common-1.2-SNAPSHOT.jar:1.2-SNAPSHOT] at org.apache.doris.thrift.FrontendService$Processor$reportExecStatus.getResult(FrontendService.java:3612) ~[fe-common-1.2-SNAPSHOT.jar:1.2-SNAPSHOT] at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:38) ~[libthrift-0.16.0.jar:0.16.0] at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:250) ~[libthrift-0.16.0.jar:0.16.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352-352] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352-352] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_352-352] ``` 2. Fix unable to cancel load --- .../main/java/org/apache/doris/qe/Coordinator.java | 1 - .../main/java/org/apache/doris/qe/DdlExecutor.java | 8 +++-- .../broker_load/test_multi_table_load.groovy | 34 +++++++++++++++++++++- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 284f0c180c5..bab1c1decbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2601,7 +2601,6 @@ public class Coordinator implements CoordInterface { icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); } - Preconditions.checkArgument(params.isSetDetailedReport()); if (ctx.done) { if (LOG.isDebugEnabled()) { LOG.debug("Query {} fragment {} is marked done", diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 94257ac90ce..43e176d8adf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -136,6 +136,7 @@ import org.apache.doris.cloud.load.CopyJob; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.job.exception.JobException; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.JobState; @@ -198,8 +199,11 @@ public class DdlExecutor { } else if (ddlStmt instanceof CancelLoadStmt) { CancelLoadStmt cs = (CancelLoadStmt) ddlStmt; // cancel all - env.getJobManager().cancelLoadJob(cs); - env.getLoadManager().cancelLoadJob(cs); + try { + env.getJobManager().cancelLoadJob(cs); + } catch (JobException e) { + env.getLoadManager().cancelLoadJob(cs); + } } else if (ddlStmt instanceof CreateRoutineLoadStmt) { env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof PauseRoutineLoadStmt) { diff --git a/regression-test/suites/load_p0/broker_load/test_multi_table_load.groovy b/regression-test/suites/load_p0/broker_load/test_multi_table_load.groovy index e0541f09ce8..b623eeb7cf0 100644 --- a/regression-test/suites/load_p0/broker_load/test_multi_table_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_multi_table_load.groovy @@ -163,4 +163,36 @@ suite("test_multi_table_load", "load_p0") { qt_sql """ SELECT COUNT(*) FROM ${tableName} """ } -} \ No newline at end of file + + // test cancel load + def tuple = data_desces[0] + def data_desc = tuple.get(0) + def load_result = tuple.get(1) + + def label = UUID.randomUUID().toString().replace("-", "0") + def sql_str = """ + LOAD LABEL $label ( + $data_desc + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + "AWS_REGION" = "ap-beijing" + ) + properties( + "use_new_load_scan_node" = "true", + "max_filter_ratio" = "1.0" + ) + """ + try { + sql """${sql_str}""" + sql """cancel load where label = "$label";""" + } catch (Exception e) { + logger.info("xx cancel load failed", e) + assertFalse(true); + } + + String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ + assertEquals("CANCELLED", result[0][2]) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org