This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 708b6c5 [RoutineLoad] Support pause or resume all routine load jobs (#6394) 708b6c5 is described below commit 708b6c529e83e9f0b70bd9d5dd950721cb8a77a2 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Aug 11 16:38:06 2021 +0800 [RoutineLoad] Support pause or resume all routine load jobs (#6394) 1. PAUSE ALL ROUTINE LOAD; 2. RESUME ALL ROUTINE LOAD; --- .../Data Manipulation/PAUSE ROUTINE LOAD.md | 11 ++- .../Data Manipulation/RESUME ROUTINE LOAD.md | 11 ++- .../Data Manipulation/PAUSE ROUTINE LOAD.md | 6 +- .../Data Manipulation/RESUME ROUTINE LOAD.md | 7 +- fe/fe-core/src/main/cup/sql_parser.cup | 8 ++ .../doris/analysis/PauseRoutineLoadStmt.java | 25 ++++- .../doris/analysis/ResumeRoutineLoadStmt.java | 25 ++++- .../doris/load/routineload/RoutineLoadManager.java | 101 ++++++++++++++++----- .../load/routineload/RoutineLoadManagerTest.java | 64 +++++++++++++ 9 files changed, 221 insertions(+), 37 deletions(-) diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md index 7b98035..92c157a 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md @@ -27,9 +27,14 @@ under the License. # PAUSE ROUTINE LOAD ## example -1. Suspend the routine import operation named test 1. +1. Pause routine load named test1; -PAUSE ROUTINE LOAD FOR test1; + PAUSE ROUTINE LOAD FOR test1; + +2. Pause all running routine load; + + PAUSE ALL ROUTINE LOAD; ## keyword -PAUSE,ROUTINE,LOAD + + PAUSE,ALL,ROUTINE,LOAD diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md index 78d4755..26a499b 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md @@ -27,9 +27,14 @@ under the License. # RESUME ROUTINE LOAD ## example -1. Restore the routine import job named test 1. +1. Resume routine load job named test1. -RESUME ROUTINE LOAD FOR test1; + RESUME ROUTINE LOAD FOR test1; + +2. Resume all paused routine load job. + + RESUME ALL ROUTINE LOAD; ## keyword -RESUME,ROUTINE,LOAD + + RESUME,ALL,ROUTINE,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md index 85f8c86..4a88095 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/PAUSE ROUTINE LOAD.md @@ -31,6 +31,10 @@ under the License. PAUSE ROUTINE LOAD FOR test1; +2. 暂停所有正在运行的例行导入作业 + + PAUSE ALL ROUTINE LOAD; + ## keyword - PAUSE,ROUTINE,LOAD + PAUSE,ALL,ROUTINE,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md index 9a3730b..5afa216 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/RESUME ROUTINE LOAD.md @@ -31,6 +31,11 @@ under the License. RESUME ROUTINE LOAD FOR test1; +2. 恢复所有暂停中的例行导入作业。 + + RESUME ALL ROUTINE LOAD; + ## keyword - RESUME,ROUTINE,LOAD + + RESUME,ALL,ROUTINE,LOAD diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 2981e84..d29bee8 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1757,6 +1757,10 @@ pause_routine_load_stmt ::= {: RESULT = new PauseRoutineLoadStmt(jobLabel); :} + | KW_PAUSE KW_ALL KW_ROUTINE KW_LOAD + {: + RESULT = new PauseRoutineLoadStmt(null); + :} ; resume_routine_load_stmt ::= @@ -1764,6 +1768,10 @@ resume_routine_load_stmt ::= {: RESULT = new ResumeRoutineLoadStmt(jobLabel); :} + | KW_RESUME KW_ALL KW_ROUTINE KW_LOAD + {: + RESULT = new ResumeRoutineLoadStmt(null); + :} ; stop_routine_load_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java index 19ae0c2..49f07f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseRoutineLoadStmt.java @@ -17,9 +17,13 @@ package org.apache.doris.analysis; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import com.google.common.base.Strings; + /* Pause routine load by name @@ -29,22 +33,35 @@ import org.apache.doris.common.UserException; public class PauseRoutineLoadStmt extends DdlStmt { private final LabelName labelName; + private String db; public PauseRoutineLoadStmt(LabelName labelName) { this.labelName = labelName; } + public boolean isAll() { + return labelName == null; + } + public String getName() { return labelName.getLabelName(); } public String getDbFullName(){ - return labelName.getDbName(); + return db; } @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - labelName.analyze(analyzer); + if (labelName != null) { + labelName.analyze(analyzer); + db = labelName.getDbName(); + } else { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java index ca31856..888d032 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeRoutineLoadStmt.java @@ -17,9 +17,13 @@ package org.apache.doris.analysis; -import org.apache.doris.common.AnalysisException; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import com.google.common.base.Strings; + /* Resume routine load job by name @@ -29,22 +33,35 @@ import org.apache.doris.common.UserException; public class ResumeRoutineLoadStmt extends DdlStmt{ private final LabelName labelName; + private String db; public ResumeRoutineLoadStmt(LabelName labelName) { this.labelName = labelName; } + public boolean isAll() { + return labelName == null; + } + public String getName() { return labelName.getLabelName(); } public String getDbFullName() { - return labelName.getDbName(); + return db; } @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - labelName.analyze(analyzer); + if (labelName != null) { + labelName.analyze(analyzer); + db = labelName.getDbName(); + } else { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 045fbd1..7b7e62f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -232,34 +232,93 @@ public class RoutineLoadManager implements Writable { return routineLoadJob; } + // get all jobs which state is not in final state from specified database + public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName) + throws MetaNotFoundException, DdlException, AnalysisException { + + List<RoutineLoadJob> result = Lists.newArrayList(); + Database database = Catalog.getCurrentCatalog().getDb(dbName); + if (database == null) { + ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName); + } + long dbId = database.getId(); + Map<String, List<RoutineLoadJob>> jobMap = dbToNameToRoutineLoadJob.get(dbId); + if (jobMap == null) { + // return empty result + return result; + } + + for (List<RoutineLoadJob> jobs : jobMap.values()) { + for (RoutineLoadJob job : jobs) { + if (!job.getState().isFinalState()) { + String tableName = job.getTableName(); + if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), + dbName, tableName, PrivPredicate.LOAD)) { + continue; + } + result.add(job); + } + } + } + + return result; + } + public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt) throws UserException { - RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), - pauseRoutineLoadStmt.getName()); + List<RoutineLoadJob> jobs = Lists.newArrayList(); + if (pauseRoutineLoadStmt.isAll()) { + jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName()); + } else { + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(), + pauseRoutineLoadStmt.getName()); + jobs.add(routineLoadJob); + } - routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, - new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, - "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"), - false /* not replay */); - LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state", - routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg", + for (RoutineLoadJob routineLoadJob : jobs) { + try { + routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED, + new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, + "User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"), + false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state", + routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg", "routine load job has been paused by user").build()); + } catch (UserException e) { + LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e); + continue; + } + } } public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException { - RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(), - resumeRoutineLoadStmt.getName()); - - routineLoadJob.jobStatistic.errorRowsAfterResumed = 0; - routineLoadJob.autoResumeCount = 0; - routineLoadJob.firstResumeTimestamp = 0; - routineLoadJob.autoResumeLock = false; - routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */); - LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("current_state", routineLoadJob.getState()) - .add("user", ConnectContext.get().getQualifiedUser()) - .add("msg", "routine load job has been resumed by user") - .build()); + + List<RoutineLoadJob> jobs = Lists.newArrayList(); + if (resumeRoutineLoadStmt.isAll()) { + jobs = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName()); + } else { + RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(), + resumeRoutineLoadStmt.getName()); + jobs.add(routineLoadJob); + } + + for (RoutineLoadJob routineLoadJob : jobs) { + try { + routineLoadJob.jobStatistic.errorRowsAfterResumed = 0; + routineLoadJob.autoResumeCount = 0; + routineLoadJob.firstResumeTimestamp = 0; + routineLoadJob.autoResumeLock = false; + routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) + .add("current_state", routineLoadJob.getState()) + .add("user", ConnectContext.get().getQualifiedUser()) + .add("msg", "routine load job has been resumed by user") + .build()); + } catch (UserException e) { + LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e); + continue; + } + } } public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index fb8fe9b..f66b197 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -913,4 +913,68 @@ public class RoutineLoadManagerTest { Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState()); } + + @Test + public void testPauseAndResumeAllRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutineLoadStmt, + @Injectable ResumeRoutineLoadStmt resumeRoutineLoadStmt, + @Mocked Catalog catalog, + @Mocked Database database, + @Mocked PaloAuth paloAuth, + @Mocked ConnectContext connectContext) throws UserException { + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap(); + Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap(); + + List<RoutineLoadJob> routineLoadJobList1 = Lists.newArrayList(); + RoutineLoadJob routineLoadJob1 = new KafkaRoutineLoadJob(); + Deencapsulation.setField(routineLoadJob1, "id", 1000L); + routineLoadJobList1.add(routineLoadJob1); + + List<RoutineLoadJob> routineLoadJobList2 = Lists.newArrayList(); + RoutineLoadJob routineLoadJob2 = new KafkaRoutineLoadJob(); + Deencapsulation.setField(routineLoadJob2, "id", 1002L); + routineLoadJobList2.add(routineLoadJob2); + + nameToRoutineLoadJob.put("job1", routineLoadJobList1); + nameToRoutineLoadJob.put("job2", routineLoadJobList2); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState()); + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState()); + + new Expectations() { + { + pauseRoutineLoadStmt.isAll(); + minTimes = 0; + result = true; + pauseRoutineLoadStmt.getDbFullName(); + minTimes = 0; + result = ""; + catalog.getDb(""); + minTimes = 0; + result = database; + database.getId(); + minTimes = 0; + result = 1L; + catalog.getAuth(); + minTimes = 0; + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any); + minTimes = 0; + result = true; + resumeRoutineLoadStmt.isAll(); + minTimes = 0; + result = true; + } + }; + + routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt); + Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob1.getState()); + Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob2.getState()); + + routineLoadManager.resumeRoutineLoadJob(resumeRoutineLoadStmt); + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState()); + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob2.getState()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org