This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1a61cb4868c [feat](job)Implementing Job in Nereids (#41391) (#41861)
1a61cb4868c is described below

commit 1a61cb4868cccbaf93ed5d5b0433440878a71b3d
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Fri Oct 18 11:56:08 2024 +0800

    [feat](job)Implementing Job in Nereids (#41391) (#41861)
    
    bp #41391
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  25 +-
 .../org/apache/doris/analysis/CreateJobStmt.java   |   1 +
 .../doris/job/extensions/insert/InsertJob.java     |   4 +-
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  28 +++
 .../apache/doris/nereids/trees/plans/PlanType.java |   1 +
 .../trees/plans/commands/CreateJobCommand.java     |  73 ++++++
 .../trees/plans/commands/info/CreateJobInfo.java   | 263 +++++++++++++++++++++
 .../commands/insert/InsertIntoTableCommand.java    |  14 +-
 .../trees/plans/visitor/CommandVisitor.java        |   5 +
 .../data/job_p0/job_meta/job_query_test.out        |   4 +-
 .../suites/job_p0/test_base_insert_job.groovy      |  59 +++--
 11 files changed, 444 insertions(+), 33 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 94f00f0efd0..41c35cda988 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -50,6 +50,7 @@ statementBase
     | supportedCreateStatement          #supportedCreateStatementAlias
     | supportedAlterStatement           #supportedAlterStatementAlias
     | materializedViewStatement         #materializedViewStatementAlias
+    | supportedJobStatement              #supportedJobStatementAlias
     | constraintStatement               #constraintStatementAlias
     | supportedDropStatement            #supportedDropStatementAlias
     | unsupportedStatement              #unsupported
@@ -102,7 +103,17 @@ materializedViewStatement
     | CANCEL MATERIALIZED VIEW TASK taskId=INTEGER_VALUE ON 
mvName=multipartIdentifier          #cancelMTMVTask
     | SHOW CREATE MATERIALIZED VIEW mvName=multipartIdentifier                 
                 #showCreateMTMV
     ;
-
+supportedJobStatement
+    : CREATE JOB label=multipartIdentifier ON SCHEDULE
+        (
+            (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
+            (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
+            (ENDS endsTime=STRING_LITERAL)?)
+            |
+            (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
+        commentSpec?
+        DO supportedDmlStatement                                               
                #createScheduledJob                                             
                       
+   ;
 constraintStatement
     : ALTER TABLE table=multipartIdentifier
         ADD CONSTRAINT constraintName=errorCapturingIdentifier
@@ -412,16 +423,8 @@ unsupportedCleanStatement
     ;
 
 unsupportedJobStatement
-    : CREATE JOB label=multipartIdentifier ON SCHEDULE
-        (
-            (EVERY timeInterval=INTEGER_VALUE timeUnit=identifier
-            (STARTS (startTime=STRING_LITERAL | CURRENT_TIMESTAMP))?
-            (ENDS endsTime=STRING_LITERAL)?)
-            |
-            (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
-        commentSpec?
-        DO statement                                                           
     #createJob
-    | PAUSE JOB wildWhere?                                                     
     #pauseJob
+
+    : PAUSE JOB wildWhere?                                                     
     #pauseJob
     | DROP JOB (IF EXISTS)? wildWhere?                                         
     #dropJob
     | RESUME JOB wildWhere?                                                    
     #resumeJob
     | CANCEL TASK wildWhere?                                                   
     #cancelJobTask
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index 0fff1e09749..8babb665299 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -60,6 +60,7 @@ import org.apache.commons.lang3.StringUtils;
  * quantity { DAY | HOUR | MINUTE |
  * WEEK | SECOND }
  */
+@Deprecated
 @Slf4j
 public class CreateJobStmt extends DdlStmt implements NotFallbackInParser {
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 43f43ba8699..487591efc04 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -299,7 +299,9 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
     @Override
     public void cancelAllTasks() throws JobException {
         try {
-            checkAuth("CANCEL LOAD");
+            if 
(getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
+                checkAuth("CANCEL LOAD");
+            }
             super.cancelAllTasks();
             this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user 
cancel");
         } catch (DdlException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 3f84f6b9e52..fbee3b858ba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -371,6 +371,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.CallCommand;
 import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
 import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.Constraint;
+import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
@@ -408,6 +409,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
 import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
 import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateTableLikeInfo;
@@ -559,6 +561,32 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return withExplain(plan, ctx.explain());
     }
 
+    @Override
+    public LogicalPlan 
visitCreateScheduledJob(DorisParser.CreateScheduledJobContext ctx) {
+        Optional<String> label = ctx.label == null ? Optional.empty() : 
Optional.of(ctx.label.getText());
+        Optional<String> atTime = ctx.atTime == null ? Optional.empty() : 
Optional.of(ctx.atTime.getText());
+        Optional<Boolean> immediateStartOptional = ctx.CURRENT_TIMESTAMP() == 
null ? Optional.of(false) :
+                Optional.of(true);
+        Optional<String> startTime = ctx.startTime == null ? Optional.empty() 
: Optional.of(ctx.startTime.getText());
+        Optional<String> endsTime = ctx.endsTime == null ? Optional.empty() : 
Optional.of(ctx.endsTime.getText());
+        Optional<Long> interval = ctx.timeInterval == null ? Optional.empty() :
+                Optional.of(Long.valueOf(ctx.timeInterval.getText()));
+        Optional<String> intervalUnit = ctx.timeUnit == null ? 
Optional.empty() : Optional.of(ctx.timeUnit.getText());
+        String comment =
+                visitCommentSpec(ctx.commentSpec());
+        String executeSql = getOriginSql(ctx.supportedDmlStatement());
+        CreateJobInfo createJobInfo = new CreateJobInfo(label, atTime, 
interval, intervalUnit, startTime,
+                endsTime, immediateStartOptional, comment, executeSql);
+        return new CreateJobCommand(createJobInfo);
+    }
+
+    @Override
+    public String visitCommentSpec(DorisParser.CommentSpecContext ctx) {
+        String commentSpec = ctx == null ? "''" : 
ctx.STRING_LITERAL().getText();
+        return
+                
LogicalPlanBuilderAssistant.escapeBackSlash(commentSpec.substring(1, 
commentSpec.length() - 1));
+    }
+
     @Override
     public LogicalPlan visitInsertTable(InsertTableContext ctx) {
         boolean isOverwrite = ctx.INTO() == null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index f3587b37921..245d9ea2d69 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -143,6 +143,7 @@ public enum PlanType {
     SELECT_INTO_OUTFILE_COMMAND,
     UPDATE_COMMAND,
     CREATE_MTMV_COMMAND,
+    CREATE_JOB_COMMAND,
     ALTER_MTMV_COMMAND,
     ADD_CONSTRAINT_COMMAND,
     DROP_CONSTRAINT_COMMAND,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
new file mode 100644
index 00000000000..fecd457ada5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.StmtType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * syntax:
+ * CREATE
+ * [DEFINER = user]
+ * JOB
+ * event_name
+ * ON SCHEDULE schedule
+ * [COMMENT 'string']
+ * DO event_body;
+ * schedule: {
+ * [STREAMING] AT timestamp
+ * | EVERY interval
+ * [STARTS timestamp ]
+ * [ENDS timestamp ]
+ * }
+ * interval:
+ * quantity { DAY | HOUR | MINUTE |
+ * WEEK | SECOND }
+ */
+public class CreateJobCommand extends Command implements ForwardWithSync {
+
+    private CreateJobInfo createJobInfo;
+
+    public CreateJobCommand(CreateJobInfo jobInfo) {
+        super(PlanType.CREATE_JOB_COMMAND);
+        this.createJobInfo = jobInfo;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        AbstractJob job = createJobInfo.analyzeAndBuildJobInfo(ctx);
+        Env.getCurrentEnv().getJobManager().registerJob(job);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitCreateJobCommand(this, context);
+    }
+
+    @Override
+    public StmtType stmtType() {
+        return StmtType.CREATE;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
new file mode 100644
index 00000000000..6cef7ee89ec
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateJobInfo.java
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
+import org.apache.doris.job.common.IntervalUnit;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.parser.NereidsParser;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Strings;
+
+import java.util.Optional;
+
+/**
+ * Build job info and analyze the SQL statement to create a job.
+ */
+public class CreateJobInfo {
+
+    // exclude job name prefix, which is used by inner job
+    private static final String excludeJobNamePrefix = "inner_";
+
+    private final Optional<String> labelNameOptional;
+
+    private final Optional<String> onceJobStartTimestampOptional;
+
+    private final Optional<Long> intervalOptional;
+
+    private final Optional<String> intervalTimeUnitOptional;
+
+    private final Optional<String> startsTimeStampOptional;
+
+    private final Optional<String> endsTimeStampOptional;
+
+    private final Optional<Boolean> immediateStartOptional;
+
+    private final String comment;
+
+    private final String executeSql;
+
+    /**
+     * Constructor for CreateJobInfo.
+     *
+     * @param labelNameOptional             Job name.
+     * @param onceJobStartTimestampOptional Start time for a one-time job.
+     * @param intervalOptional              Interval for a recurring job.
+     * @param intervalTimeUnitOptional      Interval time unit for a recurring 
job.
+     * @param startsTimeStampOptional       Start time for a recurring job.
+     * @param endsTimeStampOptional         End time for a recurring job.
+     * @param immediateStartOptional        Immediate start for a job.
+     * @param comment                       Comment for the job.
+     * @param executeSql                    Original SQL statement.
+     */
+    public CreateJobInfo(Optional<String> labelNameOptional, Optional<String> 
onceJobStartTimestampOptional,
+                         Optional<Long> intervalOptional, Optional<String> 
intervalTimeUnitOptional,
+                         Optional<String> startsTimeStampOptional, 
Optional<String> endsTimeStampOptional,
+                         Optional<Boolean> immediateStartOptional, String 
comment, String executeSql) {
+        this.labelNameOptional = labelNameOptional;
+        this.onceJobStartTimestampOptional = onceJobStartTimestampOptional;
+        this.intervalOptional = intervalOptional;
+        this.intervalTimeUnitOptional = intervalTimeUnitOptional;
+        this.startsTimeStampOptional = startsTimeStampOptional;
+        this.endsTimeStampOptional = endsTimeStampOptional;
+        this.immediateStartOptional = immediateStartOptional;
+        this.comment = comment;
+        this.executeSql = executeSql;
+
+    }
+
+    /**
+     * Analyzes the provided SQL statement and builds the job information.
+     *
+     * @param ctx Connect context.
+     * @return AbstractJob instance.
+     * @throws UserException If there is an error during SQL analysis or job 
creation.
+     */
+    public AbstractJob analyzeAndBuildJobInfo(ConnectContext ctx) throws 
UserException {
+        checkAuth();
+        if (labelNameOptional.orElseThrow(() -> new 
AnalysisException("labelName is null")).isEmpty()) {
+            throw new AnalysisException("Job name can not be empty");
+        }
+
+        String jobName = labelNameOptional.get();
+        checkJobName(jobName);
+        String dbName = ctx.getDatabase();
+
+        Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+        // check its insert stmt,currently only support insert stmt
+        JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
+        JobExecuteType executeType = intervalOptional.isPresent() ? 
JobExecuteType.RECURRING : JobExecuteType.ONE_TIME;
+        jobExecutionConfiguration.setExecuteType(executeType);
+        TimerDefinition timerDefinition = new TimerDefinition();
+
+        if (executeType.equals(JobExecuteType.ONE_TIME)) {
+            buildOnceJob(timerDefinition, jobExecutionConfiguration);
+        } else {
+            buildRecurringJob(timerDefinition, jobExecutionConfiguration);
+        }
+        jobExecutionConfiguration.setTimerDefinition(timerDefinition);
+        return analyzeAndCreateJob(executeSql, dbName, 
jobExecutionConfiguration);
+    }
+
+    /**
+     * Builds a TimerDefinition for a once-job.
+     *
+     * @param timerDefinition           Timer definition to be built.
+     * @param jobExecutionConfiguration Job execution configuration.
+     * @throws AnalysisException If the job is not configured correctly.
+     */
+    private void buildOnceJob(TimerDefinition timerDefinition,
+                              JobExecutionConfiguration 
jobExecutionConfiguration) throws AnalysisException {
+        if (immediateStartOptional.isPresent() && 
Boolean.TRUE.equals(immediateStartOptional.get())) {
+            jobExecutionConfiguration.setImmediate(true);
+            timerDefinition.setStartTimeMs(System.currentTimeMillis());
+            return;
+        }
+
+        // Ensure start time is provided for once jobs.
+        String startTime = onceJobStartTimestampOptional.orElseThrow(()
+                -> new AnalysisException("Once time job must set start time"));
+        
timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(startTime));
+    }
+
+    /**
+     * Builds a TimerDefinition for a recurring job.
+     *
+     * @param timerDefinition           Timer definition to be built.
+     * @param jobExecutionConfiguration Job execution configuration.
+     * @throws AnalysisException If the job is not configured correctly.
+     */
+    private void buildRecurringJob(TimerDefinition timerDefinition,
+                                   JobExecutionConfiguration 
jobExecutionConfiguration) throws AnalysisException {
+        // Ensure interval is provided for recurring jobs.
+        long interval = intervalOptional.orElseThrow(()
+                -> new AnalysisException("Interval must be set for recurring 
job"));
+        timerDefinition.setInterval(interval);
+
+        // Ensure interval time unit is provided for recurring jobs.
+        String intervalTimeUnit = intervalTimeUnitOptional.orElseThrow(()
+                -> new AnalysisException("Interval time unit must be set for 
recurring job"));
+        IntervalUnit intervalUnit = 
IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
+        if (intervalUnit == null) {
+            throw new AnalysisException("Invalid interval time unit: " + 
intervalTimeUnit);
+        }
+
+        // Check if interval unit is second and disable if not in test mode.
+        if (intervalUnit.equals(IntervalUnit.SECOND) && 
!Config.enable_job_schedule_second_for_test) {
+            throw new AnalysisException("Interval time unit can not be second 
in production mode");
+        }
+
+        timerDefinition.setIntervalUnit(intervalUnit);
+
+        // Set end time if provided.
+        endsTimeStampOptional.ifPresent(s -> 
timerDefinition.setEndTimeMs(stripQuotesAndParseTimestamp(s)));
+
+        // Set immediate start if configured.
+        if (immediateStartOptional.isPresent() && 
Boolean.TRUE.equals(immediateStartOptional.get())) {
+            jobExecutionConfiguration.setImmediate(true);
+            // Avoid immediate re-scheduling by setting start time slightly in 
the past.
+            timerDefinition.setStartTimeMs(System.currentTimeMillis() - 100);
+            return;
+        }
+        // Set start time if provided.
+        startsTimeStampOptional.ifPresent(s -> 
timerDefinition.setStartTimeMs(stripQuotesAndParseTimestamp(s)));
+    }
+
+    protected static void checkAuth() throws AnalysisException {
+        if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN");
+        }
+    }
+
+    /**
+     * Analyzes the provided SQL statement and creates an appropriate job 
based on the parsed logical plan.
+     * Currently, only "InsertIntoTableCommand" is supported for job creation.
+     *
+     * @param sql                       the SQL statement to be analyzed
+     * @param currentDbName             the current database name where the 
SQL statement will be executed
+     * @param jobExecutionConfiguration the configuration for job execution
+     * @return an instance of AbstractJob corresponding to the SQL statement
+     * @throws UserException if there is an error during SQL analysis or job 
creation
+     */
+    private AbstractJob analyzeAndCreateJob(String sql, String currentDbName,
+                                            JobExecutionConfiguration 
jobExecutionConfiguration) throws UserException {
+        NereidsParser parser = new NereidsParser();
+        LogicalPlan logicalPlan = parser.parseSingle(sql);
+        if (logicalPlan instanceof InsertIntoTableCommand) {
+            InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) logicalPlan;
+            try {
+                insertIntoTableCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
+                return new InsertJob(labelNameOptional.get(),
+                        JobStatus.RUNNING,
+                        currentDbName,
+                        comment,
+                        ConnectContext.get().getCurrentUserIdentity(),
+                        jobExecutionConfiguration,
+                        System.currentTimeMillis(),
+                        sql);
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage());
+            }
+        } else {
+            throw new AnalysisException("Not support this sql : " + sql + " 
Command class is "
+                    + logicalPlan.getClass().getName() + ".");
+        }
+    }
+
+    private void checkJobName(String jobName) throws AnalysisException {
+        if (Strings.isNullOrEmpty(jobName)) {
+            throw new AnalysisException("job name can not be null");
+        }
+        if (jobName.startsWith(excludeJobNamePrefix)) {
+            throw new AnalysisException("job name can not start with " + 
excludeJobNamePrefix);
+        }
+    }
+
+    /**
+     * Strips quotes from the input string and parses it to a timestamp.
+     *
+     * @param str The input string potentially enclosed in single or double 
quotes.
+     * @return The parsed timestamp as a long value, or -1L if the input is 
null or empty.
+     */
+    public static Long stripQuotesAndParseTimestamp(String str) {
+        if (str == null || str.isEmpty()) {
+            return -1L;
+        }
+        if (str.startsWith("'") && str.endsWith("'")) {
+            str = str.substring(1, str.length() - 1);
+        } else if (str.startsWith("\"") && str.endsWith("\"")) {
+            str = str.substring(1, str.length() - 1);
+        }
+        return TimeUtils.timeStringToLong(str.trim());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 74f75d2d7d5..68718de0f86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -123,12 +123,20 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         runInternal(ctx, executor);
     }
 
+    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
+        return initPlan(ctx, executor, true);
+    }
+
     /**
      * This function is used to generate the plan for Nereids.
      * There are some load functions that only need to the plan, such as 
stream_load.
      * Therefore, this section will be presented separately.
+     * @param needBeginTransaction whether to start a transaction.
+     *       For external uses such as creating a job, only basic analysis is 
needed without starting a transaction,
+     *       in which case this can be set to false.
      */
-    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
+    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor 
executor,
+                                           boolean needBeginTransaction) 
throws Exception {
         TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
         // check auth
         if (!Env.getCurrentEnv().getAccessManager()
@@ -220,6 +228,10 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
                 // TODO: support other table types
                 throw new AnalysisException("insert into command only support 
[olap, hive, iceberg, jdbc] table");
             }
+            if (!needBeginTransaction) {
+                targetTableIf.readUnlock();
+                return insertExecutor;
+            }
             if (!insertExecutor.isEmptyInsert()) {
                 insertExecutor.beginTransaction();
                 insertExecutor.finalizeSink(planner.getFragments().get(0), 
sink, physicalSink);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index a180505d42e..92b5ecbd9e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -23,6 +23,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.AlterViewCommand;
 import org.apache.doris.nereids.trees.plans.commands.CallCommand;
 import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
 import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.CreateJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateProcedureCommand;
@@ -108,6 +109,10 @@ public interface CommandVisitor<R, C> {
         return visitCommand(createMTMVCommand, context);
     }
 
+    default R visitCreateJobCommand(CreateJobCommand createJobCommand, C 
context) {
+        return visitCommand(createJobCommand, context);
+    }
+
     default R visitAlterMTMVCommand(AlterMTMVCommand alterMTMVCommand, C 
context) {
         return visitCommand(alterMTMVCommand, context);
     }
diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out 
b/regression-test/data/job_p0/job_meta/job_query_test.out
index 1a2bfe0f9cd..2bfbb890aed 100644
--- a/regression-test/data/job_p0/job_meta/job_query_test.out
+++ b/regression-test/data/job_p0/job_meta/job_query_test.out
@@ -1,7 +1,7 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !select1 --
-JOB_ONETIME    ONE_TIME        AT 2052-03-18 00:00:00  insert into 
t_test_BASE_inSert_job (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+JOB_ONETIME    ONE_TIME        AT 2052-03-18 00:00:00  insert into 
t_test_BASE_inSert_job (timestamp, type, user_id) values 
('2023-03-18','1','12213')
 
 -- !select2 --
-JOB_RECURRING  RECURRING       EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert 
into t_test_BASE_inSert_job (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+JOB_RECURRING  RECURRING       EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert 
into t_test_BASE_inSert_job (timestamp, type, user_id) values 
('2023-03-18','1','12213')
 
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index be744427d88..19f4422d64f 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -26,6 +26,9 @@ suite("test_base_insert_job") {
     def tableName = "t_test_BASE_inSert_job"
     def jobName = "insert_recovery_test_base_insert_job"
     def jobMixedName = "Insert_recovery_Test_base_insert_job"
+    sql """
+      SET enable_fallback_to_original_planner=false;
+    """
     sql """drop table if exists `${tableName}` force"""
     sql """
         DROP JOB IF EXISTS where jobname =  '${jobName}'
@@ -70,27 +73,47 @@ suite("test_base_insert_job") {
         );
         """
     sql """
-       CREATE JOB ${jobName}  ON SCHEDULE every 1 second   comment 'test' DO 
insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+        insert into ${tableName} values
+        ('2023-03-18', 1, 1)
+        """
+    sql """
+       CREATE JOB ${jobName}  ON SCHEDULE every 1 second   comment 'test' DO 
INSERT INTO ${tableName} (`timestamp`, `type`, `user_id`)
+        WITH
+            tbl_timestamp AS (
+                SELECT `timestamp` FROM ${tableName} WHERE user_id = 1
+            ),
+            tbl_type AS (
+                SELECT `type` FROM ${tableName} WHERE user_id = 1
+            ),
+            tbl_user_id AS (
+                SELECT `user_id` FROM ${tableName} WHERE user_id = 1
+            )
+        SELECT
+            tbl_timestamp.`timestamp`,
+            tbl_type.`type`,
+            tbl_user_id.`user_id`
+        FROM
+            tbl_timestamp, tbl_type, tbl_user_id;
     """
     Awaitility.await().atMost(30, SECONDS).until(
             {
                 def onceJob = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='RECURRING' """
                 println(onceJob)
-                onceJob .size() == 1 && '1' <= onceJob.get(0).get(0)
-                
+                onceJob.size() == 1 && '1' <= onceJob.get(0).get(0)
+
             }
-            )
+    )
     sql """
         PAUSE JOB where jobname =  '${jobName}'
     """
     def tblDatas = sql """select * from ${tableName}"""
     println tblDatas
-    assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some 
times 3 records
+    assert tblDatas.size() >= 2 //at least 2 records
     def pauseJobId = sql """select id from jobs("type"="insert") where 
Name='${jobName}'"""
     def taskStatus = sql """select status from tasks("type"="insert") where 
jobid= '${pauseJobId.get(0).get(0)}'"""
     println taskStatus
     for (int i = 0; i < taskStatus.size(); i++) {
-        assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) 
!= "STOPPED"||taskStatus.get(i).get(0) != "STOPPED"
+        assert taskStatus.get(i).get(0) != "FAILED" || 
taskStatus.get(i).get(0) != "STOPPED" || taskStatus.get(i).get(0) != "STOPPED"
     }
     sql """
        CREATE JOB ${jobMixedName}  ON SCHEDULE every 1 second  DO insert into 
${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
@@ -126,11 +149,11 @@ suite("test_base_insert_job") {
           CREATE JOB ${jobName}  ON SCHEDULE at current_timestamp   comment 
'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName}  values  
('2023-07-19', 2, 1001);
      """
 
-    Awaitility.await("create-one-time-job-test").atMost(30,SECONDS).until(
-        {
-            def onceJob = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' 
"""
-            onceJob.size() == 1 && '1' == onceJob.get(0).get(0)
-        }
+    Awaitility.await("create-one-time-job-test").atMost(30, SECONDS).until(
+            {
+                def onceJob = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' 
"""
+                onceJob.size() == 1 && '1' == onceJob.get(0).get(0)
+            }
     )
     def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") 
where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
     assert onceJob.size() == 1
@@ -141,7 +164,7 @@ suite("test_base_insert_job") {
     assert datas.size() == 1
     assert datas.get(0).get(0) == "FINISHED"
     // check table data
-    def dataCount1 = sql """select count(1) from ${tableName}"""
+    def dataCount1 = sql """select count(1) from ${tableName} where 
user_id=1001"""
     assert dataCount1.get(0).get(0) == 1
     // check job status
     def oncejob = sql """select status,comment from jobs("type"="insert") 
where Name='${jobName}' """
@@ -198,10 +221,10 @@ suite("test_base_insert_job") {
     println(tasks.size())
     Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
         def afterResumeTasks = sql """ select status from 
tasks("type"="insert") where JobName= '${jobName}'   """
-        println "resume tasks :"+afterResumeTasks
-        afterResumeTasks.size() >tasks.size()
+        println "resume tasks :" + afterResumeTasks
+        afterResumeTasks.size() > tasks.size()
     })
-   
+
     // assert same job name
     try {
         sql """
@@ -216,7 +239,7 @@ suite("test_base_insert_job") {
             CREATE JOB ${jobName}  ON SCHEDULE at current_timestamp   comment 
'test' DO update ${tableName} set type=2 where type=1;
         """
     } catch (Exception e) {
-        assert e.getMessage().contains("Not support this sql")
+        assert e.getMessage().contains("Not support this sql :")
     }
     // assert start time greater than current time
     try {
@@ -245,7 +268,7 @@ suite("test_base_insert_job") {
     // assert end time less than start time
     try {
         sql """
-            CREATE JOB test_error_starts  ON SCHEDULE every 1 second ends 
'2023-11-13 14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
+            CREATE JOB test_error_starts  ON SCHEDULE every 1 second starts 
current_timestamp ends '2023-11-13 14:18:07'   comment 'test' DO insert into 
${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213');
         """
     } catch (Exception e) {
         assert e.getMessage().contains("endTimeMs must be greater than the 
start time")
@@ -256,7 +279,7 @@ suite("test_base_insert_job") {
             CREATE JOB test_error_starts  ON SCHEDULE every 1 years ends 
'2023-11-13 14:18:07'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
         """
     } catch (Exception e) {
-        assert e.getMessage().contains("interval time unit can not be years")
+        assert e.getMessage().contains("Invalid interval time unit: years")
     }
 
     // test keyword as job name


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to