morningman commented on code in PR #26845: URL: https://github.com/apache/doris/pull/26845#discussion_r1391106633
########## fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java: ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.qe.ShowResultSetMetaData; + +import java.util.List; + +/** + * Job is the core of the scheduler module, which is used to store the Job information of the job module. + * We can use the job to uniquely identify a Job. + * The jobName is used to identify the job, which is not unique. + * The jobStatus is used to identify the status of the Job, which is used to control the execution of the + * job. + */ +public interface Job<T extends AbstractTask> { + + List<T> createTasks(TaskType taskType); Review Comment: Add comments for each interface' method ########## fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.insert; + +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadJob; +import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.qe.ShowResultSetMetaData; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class InsertTask extends AbstractTask { + + private String labelName; + + private InsertIntoTableCommand command; + private LoadJob.LoadStatistic statistic; + private FailMsg failMsg; + + private InsertIntoState insertIntoState; + + @Override + public void before() { + super.before(); + } + + public InsertTask(String labelName, InsertIntoTableCommand command, LoadJob.LoadStatistic statistic, + FailMsg failMsg, InsertIntoState insertIntoState) { + this.labelName = labelName; + this.command = command; + this.statistic = statistic; + this.failMsg = failMsg; + this.insertIntoState = insertIntoState; + } + + @Override + public void run() { + //insertIntoState = command. + System.out.println(getJobId() + "InsertTask run" + TimeUtils.longToTimeString(System.currentTimeMillis())); Review Comment: `System.out.println`? ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java: ########## @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import jline.internal.Log; Review Comment: What is this? ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.scheduler.JobScheduler; +import org.apache.doris.job.task.AbstractTask; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class JobManager<T extends AbstractJob<?>> implements Writable { Review Comment: Do we need to use template for JobManager? ########## fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.job.task.Task; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import org.apache.commons.collections.CollectionUtils; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Data +public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable { + + @SerializedName(value = "jobId") + private Long jobId; + + @SerializedName(value = "jobName") + private String jobName; + + @SerializedName(value = "jobStatus") + private JobStatus jobStatus; + + @SerializedName(value = "currentDbName") + private String currentDbName; + + @SerializedName(value = "comment") + private String comment; + + @SerializedName(value = "jobType") + private String createUser; + + @SerializedName(value = "jobConfig") + private JobExecutionConfiguration jobConfig; + + @SerializedName(value = "createTimeMs") + private Long createTimeMs; + + @SerializedName(value = "executeSql") + String executeSql; + + private List<T> runningTasks = new ArrayList<>(); + + @Override + public void cancel() throws JobException { + if (CollectionUtils.isEmpty(runningTasks)) { + return; + } + runningTasks.forEach(Task::cancel); + + } + + public void initTasks(List<T> tasks) { + tasks.forEach(task -> { + task.setJobId(jobId); + task.setTaskId(Env.getCurrentEnv().getNextId()); Review Comment: `Env.getCurrentEnv().getNextId()` may write edit log, So it need to be very carefully to call `initTasks()`. I think we should find a better to way to init tasks, eg, without calling `Env.getCurrentEnv().getNextId()` inside ########## fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.job.task.Task; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import org.apache.commons.collections.CollectionUtils; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Data +public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable { + + @SerializedName(value = "jobId") + private Long jobId; + + @SerializedName(value = "jobName") + private String jobName; + + @SerializedName(value = "jobStatus") + private JobStatus jobStatus; + + @SerializedName(value = "currentDbName") + private String currentDbName; + + @SerializedName(value = "comment") + private String comment; + + @SerializedName(value = "jobType") + private String createUser; + + @SerializedName(value = "jobConfig") + private JobExecutionConfiguration jobConfig; + + @SerializedName(value = "createTimeMs") + private Long createTimeMs; + + @SerializedName(value = "executeSql") + String executeSql; + + private List<T> runningTasks = new ArrayList<>(); + + @Override + public void cancel() throws JobException { + if (CollectionUtils.isEmpty(runningTasks)) { + return; + } + runningTasks.forEach(Task::cancel); + + } + + public void initTasks(List<T> tasks) { + tasks.forEach(task -> { + task.setJobId(jobId); + task.setTaskId(Env.getCurrentEnv().getNextId()); + task.setCreateTimeMs(System.currentTimeMillis()); + task.setStatus(TaskStatus.PENDING); + }); + } + + public void checkJobParams() { + if (null == jobConfig) { + throw new IllegalArgumentException("jobConfig cannot be null"); + } + jobConfig.checkParams(createTimeMs); + checkJobParamsInternal(); + } + + public void updateJobStatus(JobStatus newJobStatus) { + if (null == newJobStatus) { + throw new IllegalArgumentException("jobStatus cannot be null"); + } + if (jobStatus == newJobStatus) { + throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", + jobStatus.name(), this.jobStatus.name())); + } + if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) { + throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", + jobStatus.name(), this.jobStatus.name())); + } + if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) { + throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", + jobStatus.name(), this.jobStatus.name())); + } + jobStatus = newJobStatus; + } + + + protected abstract void checkJobParamsInternal(); + + public static AbstractJob readFields(DataInput in) throws IOException { + JobType jobType = JobType.valueOf(Text.readString(in)); Review Comment: Better use `RuntimeTypeAdapterFactory` of Gson to do the serde ########## fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java: ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.qe.ShowResultSetMetaData; + +import java.util.List; + +/** + * Job is the core of the scheduler module, which is used to store the Job information of the job module. + * We can use the job to uniquely identify a Job. + * The jobName is used to identify the job, which is not unique. + * The jobStatus is used to identify the status of the Job, which is used to control the execution of the + * job. + */ +public interface Job<T extends AbstractTask> { + + List<T> createTasks(TaskType taskType); + + void cancel(T task) throws JobException; + + boolean isReadyForScheduling(); + + + ShowResultSetMetaData getJobMetaData(); + + ShowResultSetMetaData getTaskMetaData(); + + JobType getJobType(); + + List<T> queryTasks(); + + void cancel() throws JobException; + + void onTaskFail(T task); Review Comment: Should it be the `Task`'s methods? ########## fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java: ########## @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.InsertJob; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.job.task.Task; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import org.apache.commons.collections.CollectionUtils; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@Data +public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable { + + @SerializedName(value = "jobId") Review Comment: Use abbr for the `SerializedName`, such as `jid` for `jobId` ########## fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java: ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.base; + +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.common.TaskType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.qe.ShowResultSetMetaData; + +import java.util.List; + +/** + * Job is the core of the scheduler module, which is used to store the Job information of the job module. + * We can use the job to uniquely identify a Job. + * The jobName is used to identify the job, which is not unique. + * The jobStatus is used to identify the status of the Job, which is used to control the execution of the + * job. + */ +public interface Job<T extends AbstractTask> { + + List<T> createTasks(TaskType taskType); + + void cancel(T task) throws JobException; Review Comment: better pass task id? ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.scheduler.JobScheduler; +import org.apache.doris.job.task.AbstractTask; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class JobManager<T extends AbstractJob<?>> implements Writable { + + + private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32); + + private JobScheduler jobScheduler; + + public void start() { + jobScheduler = new JobScheduler(jobMap); + jobScheduler.start(); + } + + public Long registerJob(T job) throws JobException { + //job.checkJobParams(); + checkJobNameExist(job.getJobName(), job.getJobType(), job.getCurrentDbName()); + long id = Env.getCurrentEnv().getNextId(); + job.setJobId(id); + Env.getCurrentEnv().getEditLog().logCreateJob(job); + //check name exist + jobMap.put(id, job); + //check its need to scheduler + jobScheduler.scheduleOneJob(job); + return id; + } + + + private void checkJobNameExist(String jobName, JobType type, String currentDbName) throws JobException { + if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName) && a.getJobType().equals(type) + && (null == a.getCurrentDbName() || a.getCurrentDbName().equals(currentDbName)))) { + throw new JobException("job name exist,jobName:" + jobName); + } + } + + public void unregisterJob(Long jobId) throws JobException { + checkJobExist(jobId); + jobMap.get(jobId).setJobStatus(JobStatus.STOPPED); + jobMap.get(jobId).cancel(); + Env.getCurrentEnv().getEditLog().logDeleteJob(jobMap.get(jobId)); + jobMap.remove(jobId); + } + + public void unregisterJob(String currentDbName, String jobName) throws JobException { + for (T a : jobMap.values()) { + if (a.getJobName().equals(jobName) && (null != a.getCurrentDbName() + && a.getCurrentDbName().equals(currentDbName)) && a.getJobType().equals(JobType.INSERT)) { + try { + unregisterJob(a.getJobId()); + } catch (JobException e) { + throw new JobException("unregister job error,jobName:" + jobName); + } + } + } + + } + + public void alterJobStatus(Long jobId, JobStatus status) throws JobException { + checkJobExist(jobId); + jobMap.get(jobId).updateJobStatus(status); + Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId)); + } + + public void alterJobStatus(String currentDbName, String jobName, JobStatus jobStatus) throws JobException { + for (T a : jobMap.values()) { + if (a.getJobName().equals(jobName) && (null != a.getCurrentDbName() + && a.getCurrentDbName().equals(currentDbName)) && JobType.INSERT.equals(a.getJobType())) { + try { + alterJobStatus(a.getJobId(), jobStatus); + } catch (JobException e) { + throw new JobException("unregister job error,jobName:" + jobName); + } + } + } + } + + private void checkJobExist(Long jobId) throws JobException { + if (null == jobMap.get(jobId)) { + throw new JobException("job not exist,jobId:" + jobId); + } + } + + public List<T> queryJobs(JobType type) { + return jobMap.values().stream().filter(a -> a.getJobType().equals(type)) + .collect(java.util.stream.Collectors.toList()); + } + + public List<T> queryJobs(String currentDb, String jobName) { + //only query insert job,we just provide insert job + return jobMap.values().stream().filter(a -> checkItsMatch(currentDb, jobName, a)) + .collect(Collectors.toList()); + } + + private boolean checkItsMatch(String currentDb, String jobName, T job) { + if (StringUtils.isBlank(jobName)) { + return job.getJobType().equals(JobType.INSERT) && (null != job.getCurrentDbName() + && job.getCurrentDbName().equals(currentDb)); + } + return job.getJobType().equals(JobType.INSERT) && (null != job.getCurrentDbName() + && job.getCurrentDbName().equals(currentDb)) && job.getJobName().equals(jobName); + } + + public List<? extends AbstractTask> queryTasks(Long jobId) throws JobException { + checkJobExist(jobId); + return jobMap.get(jobId).queryTasks(); + } + + public void replayCreateJob(T job) { + if (jobMap.containsKey(job.getJobId())) { + return; + } + jobMap.putIfAbsent(job.getJobId(), job); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay create scheduler job").build()); + } + + /** + * Replay update load job. + **/ + public void replayUpdateJob(T job) { + jobMap.put(job.getJobId(), job); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay update scheduler job").build()); + } + + public void replayDeleteJob(T job) { + if (null == jobMap.get(job.getJobId())) { + return; + } + jobMap.remove(job.getJobId()); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay delete scheduler job").build()); + } + + void cancelTask(Long jobId, Long taskId) throws JobException { + checkJobExist(jobId); + if (null == jobMap.get(jobId).getRunningTasks()) { + throw new JobException("task not exist,taskId:" + taskId); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(jobMap.size()); Review Comment: Use Gson ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.scheduler.JobScheduler; +import org.apache.doris.job.task.AbstractTask; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class JobManager<T extends AbstractJob<?>> implements Writable { + + + private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32); + + private JobScheduler jobScheduler; + + public void start() { + jobScheduler = new JobScheduler(jobMap); + jobScheduler.start(); + } + + public Long registerJob(T job) throws JobException { + //job.checkJobParams(); + checkJobNameExist(job.getJobName(), job.getJobType(), job.getCurrentDbName()); + long id = Env.getCurrentEnv().getNextId(); Review Comment: I suggest that the job id should be assigned when job is created. Not in `registerJob()` method, so it will easier to write unit test ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java: ########## @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.disruptor.ExecuteTaskEvent; +import org.apache.doris.job.disruptor.TaskDisruptor; +import org.apache.doris.job.disruptor.TimerJobEvent; +import org.apache.doris.job.executor.DefaultTaskExecutorHandler; +import org.apache.doris.job.executor.DispatchTaskHandler; +import org.apache.doris.job.extensions.insert.InsertTask; +import org.apache.doris.job.task.AbstractTask; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventTranslatorVararg; +import com.lmax.disruptor.WorkHandler; +import lombok.Getter; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.ThreadFactory; + +public class TaskDisruptorGroupManager<T extends AbstractTask> { + + private final Map<JobType, TaskDisruptor<T>> disruptorMap = new EnumMap<>(JobType.class); + + @Getter + private TaskDisruptor<TimerJobEvent<AbstractJob<?>>> dispatchDisruptor; + + + public void init() { + registerInsertDisruptor(); + //when all task queue is ready, dispatch task to registered task executor + registerDispatchDisruptor(); + } + + private void registerDispatchDisruptor() { + EventFactory<TimerJobEvent<AbstractJob<T>>> dispatchEventFactory = TimerJobEvent.factory(); + ThreadFactory dispatchThreadFactory = new CustomThreadFactory("dispatch-task"); + WorkHandler[] dispatchTaskExecutorHandlers = new WorkHandler[5]; + for (int i = 0; i < 5; i++) { + dispatchTaskExecutorHandlers[i] = new DispatchTaskHandler(this.disruptorMap); + } + EventTranslatorVararg<TimerJobEvent<AbstractJob<T>>> eventTranslator = + (event, sequence, args) -> event.setJob((AbstractJob<T>) args[0]); + this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, 1024, dispatchThreadFactory, + new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); + } + + private void registerInsertDisruptor() { + EventFactory<ExecuteTaskEvent<InsertTask>> insertEventFactory = ExecuteTaskEvent.factory(); + ThreadFactory insertTaskThreadFactory = new CustomThreadFactory("insert-task-execute"); + WorkHandler[] insertTaskExecutorHandlers = new WorkHandler[5]; + for (int i = 0; i < 5; i++) { + insertTaskExecutorHandlers[i] = new DefaultTaskExecutorHandler<InsertTask>(); + } + EventTranslatorVararg<ExecuteTaskEvent<InsertTask>> eventTranslator = + (event, sequence, args) -> { + event.setTask((InsertTask) args[0]); + event.setJobConfig((JobExecutionConfiguration) args[1]); + }; + TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, 1024, Review Comment: 1024 is a magic number ########## fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.scheduler; + +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.TaskType; +import org.apache.doris.job.disruptor.TaskDisruptor; +import org.apache.doris.job.executor.TimerJobSchedulerTask; +import org.apache.doris.job.manager.TaskDisruptorGroupManager; +import org.apache.doris.job.task.AbstractTask; + +import io.netty.util.HashedWheelTimer; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class JobScheduler<T extends AbstractJob<?>> implements Closeable { Review Comment: Why using template? ########## fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.scheduler; + +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecuteType; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.TaskType; +import org.apache.doris.job.disruptor.TaskDisruptor; +import org.apache.doris.job.executor.TimerJobSchedulerTask; +import org.apache.doris.job.manager.TaskDisruptorGroupManager; +import org.apache.doris.job.task.AbstractTask; + +import io.netty.util.HashedWheelTimer; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class JobScheduler<T extends AbstractJob<?>> implements Closeable { + + /** + * scheduler tasks, it's used to scheduler job + */ + private HashedWheelTimer timerTaskScheduler; + + private TaskDisruptor timerJobDisruptor; + + private TaskDisruptorGroupManager taskDisruptorGroupManager; + + private long latestBatchSchedulerTimerTaskTimeMs = 0L; + + private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 60; + + private final Map<Long, T> jobMap; + + public JobScheduler(Map<Long, T> jobMap) { + this.jobMap = jobMap; + } + + /** + * batch scheduler interval ms time + */ + private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L; + + public void start() { + timerTaskScheduler = new HashedWheelTimer(new CustomThreadFactory("timer-task-scheduler"), 1, + TimeUnit.SECONDS, 660); Review Comment: 660 is a magic number ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java: ########## @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.common.CustomThreadFactory; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.base.JobExecutionConfiguration; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.disruptor.ExecuteTaskEvent; +import org.apache.doris.job.disruptor.TaskDisruptor; +import org.apache.doris.job.disruptor.TimerJobEvent; +import org.apache.doris.job.executor.DefaultTaskExecutorHandler; +import org.apache.doris.job.executor.DispatchTaskHandler; +import org.apache.doris.job.extensions.insert.InsertTask; +import org.apache.doris.job.task.AbstractTask; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventTranslatorVararg; +import com.lmax.disruptor.WorkHandler; +import lombok.Getter; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.ThreadFactory; + +public class TaskDisruptorGroupManager<T extends AbstractTask> { + + private final Map<JobType, TaskDisruptor<T>> disruptorMap = new EnumMap<>(JobType.class); + + @Getter + private TaskDisruptor<TimerJobEvent<AbstractJob<?>>> dispatchDisruptor; + + + public void init() { + registerInsertDisruptor(); + //when all task queue is ready, dispatch task to registered task executor + registerDispatchDisruptor(); + } + + private void registerDispatchDisruptor() { + EventFactory<TimerJobEvent<AbstractJob<T>>> dispatchEventFactory = TimerJobEvent.factory(); + ThreadFactory dispatchThreadFactory = new CustomThreadFactory("dispatch-task"); + WorkHandler[] dispatchTaskExecutorHandlers = new WorkHandler[5]; + for (int i = 0; i < 5; i++) { + dispatchTaskExecutorHandlers[i] = new DispatchTaskHandler(this.disruptorMap); + } + EventTranslatorVararg<TimerJobEvent<AbstractJob<T>>> eventTranslator = + (event, sequence, args) -> event.setJob((AbstractJob<T>) args[0]); + this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, 1024, dispatchThreadFactory, + new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); + } + + private void registerInsertDisruptor() { + EventFactory<ExecuteTaskEvent<InsertTask>> insertEventFactory = ExecuteTaskEvent.factory(); + ThreadFactory insertTaskThreadFactory = new CustomThreadFactory("insert-task-execute"); + WorkHandler[] insertTaskExecutorHandlers = new WorkHandler[5]; + for (int i = 0; i < 5; i++) { Review Comment: 5 is a magic number ########## fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; +import org.apache.doris.job.common.JobType; +import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.scheduler.JobScheduler; +import org.apache.doris.job.task.AbstractTask; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class JobManager<T extends AbstractJob<?>> implements Writable { Review Comment: Do we need to use template for JobManager? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org