This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new de90051162d [Fix](Job)Replaying logs should not modify the original information of the job (#40474) de90051162d is described below commit de90051162de7004cf171bbf4d21bd95ff9f3540 Author: Calvin Kirs <k...@apache.org> AuthorDate: Thu Sep 12 09:59:30 2024 +0800 [Fix](Job)Replaying logs should not modify the original information of the job (#40474) ## Proposed changes ``` JobExecutionConfiguration jobConfig = new JobExecutionConfiguration(); jobConfig.setExecuteType(JobExecuteType.INSTANT); setJobConfig(jobConfig); ``` - Replaying logs should not modify the original information of the job - Use the new optimizer to check whether the executed statement is legal --- .../org/apache/doris/analysis/CreateJobStmt.java | 40 +++++++---------- .../doris/job/extensions/insert/InsertJob.java | 19 +------- .../data/job_p0/job_meta/job_query_test.out | 7 +++ .../suites/job_p0/job_meta/job_query_test.groovy | 28 ++++++++++++ regression-test/suites/job_p0/job_meta/load.groovy | 50 ++++++++++++++++++++++ .../suites/job_p0/test_base_insert_job.groovy | 2 +- 6 files changed, 103 insertions(+), 43 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index b9d42b249b2..0fff1e09749 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -32,15 +32,15 @@ import org.apache.doris.job.common.IntervalUnit; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.ImmutableSet; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import java.util.HashSet; - /** * syntax: * CREATE @@ -91,12 +91,6 @@ public class CreateJobStmt extends DdlStmt implements NotFallbackInParser { // exclude job name prefix, which is used by inner job private static final String excludeJobNamePrefix = "inner_"; - private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass - = new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class) - .build(); - - private static final HashSet<String> supportStmtClassNamesCache = new HashSet<>(16); - public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp, Long interval, String intervalTimeUnit, String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) { @@ -118,7 +112,6 @@ public class CreateJobStmt extends DdlStmt implements NotFallbackInParser { labelName.analyze(analyzer); String dbName = labelName.getDbName(); Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); - analyzerSqlStmt(); // check its insert stmt,currently only support insert stmt //todo when support other stmt,need to check stmt type and generate jobInstance JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); @@ -164,6 +157,7 @@ public class CreateJobStmt extends DdlStmt implements NotFallbackInParser { jobExecutionConfiguration.setTimerDefinition(timerDefinition); String originStmt = getOrigStmt().originStmt; String executeSql = parseExecuteSql(originStmt, jobName, comment); + analyzerSqlStmt(executeSql); // create job use label name as its job name InsertJob job = new InsertJob(jobName, JobStatus.RUNNING, @@ -191,22 +185,20 @@ public class CreateJobStmt extends DdlStmt implements NotFallbackInParser { } } - private void checkStmtSupport() throws AnalysisException { - if (supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) { - return; - } - for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) { - if (clazz.isAssignableFrom(doStmt.getClass())) { - supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName()); - return; + private void analyzerSqlStmt(String sql) throws UserException { + NereidsParser parser = new NereidsParser(); + LogicalPlan logicalPlan = parser.parseSingle(sql); + if (logicalPlan instanceof InsertIntoTableCommand) { + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan; + try { + insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor()); + } catch (Exception e) { + throw new AnalysisException(e.getMessage()); } - } - throw new AnalysisException("Not support " + doStmt.getClass().getSimpleName() + " type in job"); - } - private void analyzerSqlStmt() throws UserException { - checkStmtSupport(); - doStmt.analyze(analyzer); + } else { + throw new AnalysisException("Not support this sql : " + sql); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 47d52c170b2..43f43ba8699 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -31,8 +31,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.LogBuilder; -import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; @@ -647,23 +645,8 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl @Override public void onReplayCreate() throws JobException { - JobExecutionConfiguration jobConfig = new JobExecutionConfiguration(); - jobConfig.setExecuteType(JobExecuteType.INSTANT); - setJobConfig(jobConfig); onRegister(); - checkJobParams(); - log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build()); - } - - @Override - public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob) throws JobException { - if (!(replayJob instanceof InsertJob)) { - return; - } - InsertJob insertJob = (InsertJob) replayJob; - unprotectReadEndOperation(insertJob); - log.info(new LogBuilder(LogKey.LOAD_JOB, - insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build()); + super.onReplayCreate(); } public int getProgress() { diff --git a/regression-test/data/job_p0/job_meta/job_query_test.out b/regression-test/data/job_p0/job_meta/job_query_test.out new file mode 100644 index 00000000000..1a2bfe0f9cd --- /dev/null +++ b/regression-test/data/job_p0/job_meta/job_query_test.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +JOB_ONETIME ONE_TIME AT 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); + +-- !select2 -- +JOB_RECURRING RECURRING EVERY 1 HOUR STARTS 2052-03-18 00:00:00 insert into t_test_BASE_inSert_job (timestamp, type, user_id) values ('2023-03-18','1','12213'); + diff --git a/regression-test/suites/job_p0/job_meta/job_query_test.groovy b/regression-test/suites/job_p0/job_meta/job_query_test.groovy new file mode 100644 index 00000000000..3505a8108dd --- /dev/null +++ b/regression-test/suites/job_p0/job_meta/job_query_test.groovy @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite('job_query_test', 'p0,restart_fe') { + def oneTimeJobName = "JOB_ONETIME" + def recurringJobName = "JOB_RECURRING" + qt_select1 """ + select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" = "insert") where name = '${oneTimeJobName}' + """ + qt_select2 """ + select name, ExecuteType,RecurringStrategy,ExecuteSql from jobs("type" = "insert") where name = '${recurringJobName}' + """ + + +} \ No newline at end of file diff --git a/regression-test/suites/job_p0/job_meta/load.groovy b/regression-test/suites/job_p0/job_meta/load.groovy new file mode 100644 index 00000000000..bf7b8a12128 --- /dev/null +++ b/regression-test/suites/job_p0/job_meta/load.groovy @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('load', 'p0,restart_fe') { + def tableName = "t_test_BASE_inSert_job" + def oneTimeJobName = "JOB_ONETIME" + def recurringJobName = "JOB_RECURRING" + sql """drop table if exists `${tableName}` force""" + sql """ + DROP JOB IF EXISTS where jobname = '${oneTimeJobName}' + """ + sql """ + DROP JOB IF EXISTS where jobname = '${recurringJobName}' + """ + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` + ( + `timestamp` DATE NOT NULL COMMENT "['0000-01-01', '9999-12-31']", + `type` TINYINT NOT NULL COMMENT "[-128, 127]", + `user_id` BIGINT COMMENT "[-9223372036854775808, 9223372036854775807]" + ) + DUPLICATE KEY(`timestamp`, `type`) + DISTRIBUTED BY HASH(`type`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + CREATE JOB ${recurringJobName} ON SCHEDULE every 1 HOUR STARTS '2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + + sql """ + CREATE JOB ${oneTimeJobName} ON SCHEDULE AT '2052-03-18 00:00:00' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + +} \ No newline at end of file diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index e67e65bf345..be744427d88 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -216,7 +216,7 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at current_timestamp comment 'test' DO update ${tableName} set type=2 where type=1; """ } catch (Exception e) { - assert e.getMessage().contains("Not support UpdateStmt type in job") + assert e.getMessage().contains("Not support this sql") } // assert start time greater than current time try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org