This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de90051162d [Fix](Job)Replaying logs should not modify the original 
information of the job (#40474)
de90051162d is described below

commit de90051162de7004cf171bbf4d21bd95ff9f3540
Author: Calvin Kirs <k...@apache.org>
AuthorDate: Thu Sep 12 09:59:30 2024 +0800

    [Fix](Job)Replaying logs should not modify the original information of the 
job (#40474)
    
    ## Proposed changes
    ```
            JobExecutionConfiguration jobConfig = new 
JobExecutionConfiguration();
            jobConfig.setExecuteType(JobExecuteType.INSTANT);
            setJobConfig(jobConfig);
    ```
    - Replaying logs should not modify the original information of the job
    - Use the new optimizer to check whether the executed statement is legal
---
 .../org/apache/doris/analysis/CreateJobStmt.java   | 40 +++++++----------
 .../doris/job/extensions/insert/InsertJob.java     | 19 +-------
 .../data/job_p0/job_meta/job_query_test.out        |  7 +++
 .../suites/job_p0/job_meta/job_query_test.groovy   | 28 ++++++++++++
 regression-test/suites/job_p0/job_meta/load.groovy | 50 ++++++++++++++++++++++
 .../suites/job_p0/test_base_insert_job.groovy      |  2 +-
 6 files changed, 103 insertions(+), 43 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
index b9d42b249b2..0fff1e09749 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java
@@ -32,15 +32,15 @@ import org.apache.doris.job.common.IntervalUnit;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.parser.NereidsParser;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.collect.ImmutableSet;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
-import java.util.HashSet;
-
 /**
  * syntax:
  * CREATE
@@ -91,12 +91,6 @@ public class CreateJobStmt extends DdlStmt implements 
NotFallbackInParser {
     // exclude job name prefix, which is used by inner job
     private static final String excludeJobNamePrefix = "inner_";
 
-    private static final ImmutableSet<Class<? extends DdlStmt>> 
supportStmtSuperClass
-            = new ImmutableSet.Builder<Class<? extends 
DdlStmt>>().add(InsertStmt.class)
-            .build();
-
-    private static final HashSet<String> supportStmtClassNamesCache = new 
HashSet<>(16);
-
     public CreateJobStmt(LabelName labelName, JobExecuteType executeType, 
String onceJobStartTimestamp,
                          Long interval, String intervalTimeUnit,
                          String startsTimeStamp, String endsTimeStamp, String 
comment, StatementBase doStmt) {
@@ -118,7 +112,6 @@ public class CreateJobStmt extends DdlStmt implements 
NotFallbackInParser {
         labelName.analyze(analyzer);
         String dbName = labelName.getDbName();
         Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
-        analyzerSqlStmt();
         // check its insert stmt,currently only support insert stmt
         //todo when support other stmt,need to check stmt type and generate 
jobInstance
         JobExecutionConfiguration jobExecutionConfiguration = new 
JobExecutionConfiguration();
@@ -164,6 +157,7 @@ public class CreateJobStmt extends DdlStmt implements 
NotFallbackInParser {
         jobExecutionConfiguration.setTimerDefinition(timerDefinition);
         String originStmt = getOrigStmt().originStmt;
         String executeSql = parseExecuteSql(originStmt, jobName, comment);
+        analyzerSqlStmt(executeSql);
         // create job use label name as its job name
         InsertJob job = new InsertJob(jobName,
                 JobStatus.RUNNING,
@@ -191,22 +185,20 @@ public class CreateJobStmt extends DdlStmt implements 
NotFallbackInParser {
         }
     }
 
-    private void checkStmtSupport() throws AnalysisException {
-        if 
(supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) {
-            return;
-        }
-        for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
-            if (clazz.isAssignableFrom(doStmt.getClass())) {
-                
supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName());
-                return;
+    private void analyzerSqlStmt(String sql) throws UserException {
+        NereidsParser parser = new NereidsParser();
+        LogicalPlan logicalPlan = parser.parseSingle(sql);
+        if (logicalPlan instanceof InsertIntoTableCommand) {
+            InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) logicalPlan;
+            try {
+                insertIntoTableCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor());
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage());
             }
-        }
-        throw new AnalysisException("Not support " + 
doStmt.getClass().getSimpleName() + " type in job");
-    }
 
-    private void analyzerSqlStmt() throws UserException {
-        checkStmtSupport();
-        doStmt.analyze(analyzer);
+        } else {
+            throw new AnalysisException("Not support this sql : " + sql);
+        }
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 47d52c170b2..43f43ba8699 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -31,8 +31,6 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.common.util.LogBuilder;
-import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.base.AbstractJob;
@@ -647,23 +645,8 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
 
     @Override
     public void onReplayCreate() throws JobException {
-        JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
-        jobConfig.setExecuteType(JobExecuteType.INSTANT);
-        setJobConfig(jobConfig);
         onRegister();
-        checkJobParams();
-        log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", 
"replay create load job").build());
-    }
-
-    @Override
-    public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob) 
throws JobException {
-        if (!(replayJob instanceof InsertJob)) {
-            return;
-        }
-        InsertJob insertJob = (InsertJob) replayJob;
-        unprotectReadEndOperation(insertJob);
-        log.info(new LogBuilder(LogKey.LOAD_JOB,
-                insertJob.getJobId()).add("operation", insertJob).add("msg", 
"replay end load job").build());
+        super.onReplayCreate();
     }
 
     public int getProgress() {
diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out 
b/regression-test/data/job_p0/job_meta/job_query_test.out
new file mode 100644
index 00000000000..1a2bfe0f9cd
--- /dev/null
+++ b/regression-test/data/job_p0/job_meta/job_query_test.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1 --
+JOB_ONETIME    ONE_TIME        AT 2052-03-18 00:00:00  insert into 
t_test_BASE_inSert_job (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+
+-- !select2 --
+JOB_RECURRING  RECURRING       EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert 
into t_test_BASE_inSert_job (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+
diff --git a/regression-test/suites/job_p0/job_meta/job_query_test.groovy 
b/regression-test/suites/job_p0/job_meta/job_query_test.groovy
new file mode 100644
index 00000000000..3505a8108dd
--- /dev/null
+++ b/regression-test/suites/job_p0/job_meta/job_query_test.groovy
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite('job_query_test', 'p0,restart_fe') {
+    def oneTimeJobName = "JOB_ONETIME"
+    def recurringJobName = "JOB_RECURRING"
+    qt_select1  """
+       select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" 
= "insert") where name = '${oneTimeJobName}'
+    """
+    qt_select2  """
+       select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" 
= "insert") where name = '${recurringJobName}'
+    """
+   
+    
+}
\ No newline at end of file
diff --git a/regression-test/suites/job_p0/job_meta/load.groovy 
b/regression-test/suites/job_p0/job_meta/load.groovy
new file mode 100644
index 00000000000..bf7b8a12128
--- /dev/null
+++ b/regression-test/suites/job_p0/job_meta/load.groovy
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('load', 'p0,restart_fe') {
+    def tableName = "t_test_BASE_inSert_job"
+    def oneTimeJobName = "JOB_ONETIME"
+    def recurringJobName = "JOB_RECURRING"
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+       DROP JOB IF EXISTS where jobname =  '${oneTimeJobName}'
+    """
+    sql """
+       DROP JOB IF EXISTS where jobname =  '${recurringJobName}'
+    """
+    sql """
+        CREATE TABLE IF NOT EXISTS `${tableName}`
+        (
+            `timestamp` DATE NOT NULL COMMENT "['0000-01-01', '9999-12-31']",
+            `type` TINYINT NOT NULL COMMENT "[-128, 127]",
+            `user_id` BIGINT COMMENT "[-9223372036854775808, 
9223372036854775807]"
+        )
+            DUPLICATE KEY(`timestamp`, `type`)
+        DISTRIBUTED BY HASH(`type`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        );
+        """
+    sql """
+       CREATE JOB ${recurringJobName}  ON SCHEDULE every 1 HOUR STARTS 
'2052-03-18 00:00:00'   comment 'test' DO insert into ${tableName} (timestamp, 
type, user_id) values ('2023-03-18','1','12213');
+    """
+
+    sql """
+       CREATE JOB ${oneTimeJobName}  ON SCHEDULE  AT '2052-03-18 00:00:00'  
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+    """
+
+}
\ No newline at end of file
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index e67e65bf345..be744427d88 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -216,7 +216,7 @@ suite("test_base_insert_job") {
             CREATE JOB ${jobName}  ON SCHEDULE at current_timestamp   comment 
'test' DO update ${tableName} set type=2 where type=1;
         """
     } catch (Exception e) {
-        assert e.getMessage().contains("Not support UpdateStmt type in job")
+        assert e.getMessage().contains("Not support this sql")
     }
     // assert start time greater than current time
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to