This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5af078bcdd5 [branch-3.0][Chore](job) Provides configuration of job execution queue size (#42253) (#42531) 5af078bcdd5 is described below commit 5af078bcdd516a47f3b2b1b8572e0a2fc9f5e206 Author: Calvin Kirs <k...@apache.org> AuthorDate: Mon Oct 28 13:42:18 2024 +0800 [branch-3.0][Chore](job) Provides configuration of job execution queue size (#42253) (#42531) When dealing with a large number of tasks, the default execution queue size is 1024. This can lead to tasks being dropped if the queue becomes full. eg `dispatch instant task failed, job id is xxx` To address this, you can add the parameters `insert_task_queue_size` and `mtmv_task_queue_size` in the `fe.conf` configuration file. These parameters must be set to a power of 2. **Keep in mind, increasing this value is recommended only when thread resources are limited; otherwise, you should consider increasing the number of task execution threads.** (cherry picked from commit f9ea8f8229e9f5514c1773bd25c3cc11985c63fb) ## Proposed changes Issue Number: #42253 <!--Describe your changes.--> --- .../main/java/org/apache/doris/common/Config.java | 26 +++++++++++----- .../org/apache/doris/job/base/AbstractJob.java | 19 +++++++++++- .../main/java/org/apache/doris/job/base/Job.java | 7 +++++ .../doris/job/executor/TimerJobSchedulerTask.java | 10 ++++-- .../doris/job/extensions/insert/InsertJob.java | 6 ++++ .../apache/doris/job/extensions/mtmv/MTMVJob.java | 7 +++++ .../job/manager/TaskDisruptorGroupManager.java | 24 +++++++++++++-- .../apache/doris/job/scheduler/JobScheduler.java | 4 +-- .../job/manager/TaskDisruptorGroupManagerTest.java | 36 ++++++++++++++++++++++ regression-test/pipeline/p0/conf/fe.conf | 1 + 10 files changed, 125 insertions(+), 15 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1b0b3f77b2b..5f909ed84c1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1746,7 +1746,7 @@ public class Config extends ConfigBase { * corresponding type of job * The value should be greater than 0, if it is 0 or <=0, set it to 5 */ - @ConfField(description = {"用于分发定时任务的线程数", + @ConfField(masterOnly = true, description = {"用于分发定时任务的线程数", "The number of threads used to dispatch timer job."}) public static int job_dispatch_timer_job_thread_num = 2; @@ -1757,24 +1757,34 @@ public class Config extends ConfigBase { * {@code @dispatch_timer_job_thread_num} * The value should be greater than 0, if it is 0 or <=0, set it to 1024 */ - @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."}) + @ConfField(masterOnly = true, description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."}) public static int job_dispatch_timer_job_queue_size = 1024; - @ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。", + @ConfField(masterOnly = true, description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。", "Maximum number of persistence allowed per task in a job,exceeding which old tasks will be discarded," + "If the value is less than 1, it will not be persisted." }) public static int max_persistence_task_count = 100; - @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", + @ConfField(masterOnly = true, description = {"MV task 的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近的" + + " 2 的幂次方数", "The size of the MV task's waiting queue If the size is negative, 1024 will be used. If " + + "the size is not a power of two, the nearest power of the size will be" + + " automatically selected."}) + public static int mtmv_task_queue_size = 1024; + @ConfField(masterOnly = true, description = {"Insert task 的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近" + + " 的 2 的幂次方数", "The size of the Insert task's waiting queue If the size is negative, 1024 will be used." + + " If the size is not a power of two, the nearest power of the size will " + + "be automatically selected."}) + public static int insert_task_queue_size = 1024; + @ConfField(masterOnly = true, description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", "The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"}) public static int finished_job_cleanup_threshold_time_hour = 24; - @ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5", + @ConfField(masterOnly = true, description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为10", "The number of threads used to consume Insert tasks, " - + "the value should be greater than 0, if it is <=0, default is 5."}) + + "the value should be greater than 0, if it is <=0, default is 10."}) public static int job_insert_task_consumer_thread_num = 10; - @ConfField(description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为5", + @ConfField(masterOnly = true, description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为10", "The number of threads used to consume mtmv tasks, " - + "the value should be greater than 0, if it is <=0, default is 5."}) + + "the value should be greater than 0, if it is <=0, default is 10."}) public static int job_mtmv_task_consumer_thread_num = 10; /* job test config */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 94a0b0146cd..62ac0c4d59d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -63,7 +63,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C new Column("SucceedTaskCount", ScalarType.createStringType()), new Column("FailedTaskCount", ScalarType.createStringType()), new Column("CanceledTaskCount", ScalarType.createStringType()) - ); + ); @SerializedName(value = "jid") private Long jobId; @@ -415,6 +415,23 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C return getCommonTvfInfo(); } + /** + * Generates a common error message when the execution queue is full. + * + * @param taskId The ID of the task. + * @param queueConfigName The name of the queue configuration. + * @param executeThreadConfigName The name of the execution thread configuration. + * @return A formatted error message. + */ + protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName, + String executeThreadConfigName) { + return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, " + + "you can increase the queue size by setting the property " + + "%s in the fe.conf file or increase the value of " + + "the property %s in the fe.conf file", getJobId(), getJobName(), taskId, queueConfigName, + executeThreadConfigName); + } + @Override public ShowResultSetMetaData getJobMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index 1124e7f2d28..a7e75554c71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -153,4 +153,11 @@ public interface Job<T extends AbstractTask, C> { * @return TRow */ TRow getTvfInfo(); + + /** + * Generates a common error message when the execution queue is full. + * @param taskId task id + * @return error msg for user + */ + String formatMsgWhenExecuteQueueFull(Long taskId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 25bbccf3fa2..65a9cf2e091 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -45,11 +45,17 @@ public class TimerJobSchedulerTask<T extends AbstractJob> implements TimerTask { return; } if (!dispatchDisruptor.publishEvent(this.job)) { - log.warn("dispatch timer job failed, job id is {}, job name is {}", - this.job.getJobId(), this.job.getJobName()); + log.warn("dispatch timer job failed, queue maybe full. job id is {}, job name is {}", + this.job.getJobId(), this.job.getJobName() + getMsgWhenExecuteQueueFull()); } } catch (Exception e) { log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } } + + private String getMsgWhenExecuteQueueFull() { + return "you can increase the queue size by setting the property " + + "job_dispatch_timer_job_queue_size in the fe.conf file or increase the value of " + + "the property job_dispatch_timer_job_thread_num in the fe.conf file"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 487591efc04..ce35227feb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -522,6 +522,12 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl } } + @Override + public String formatMsgWhenExecuteQueueFull(Long taskId) { + return commonFormatMsgWhenExecuteQueueFull(taskId, "insert_task_queue_size", + "job_insert_task_consumer_thread_num"); + } + private String getPriority() { return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index add191001f9..62c005ca280 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -218,6 +218,13 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> { return data; } + @Override + public String formatMsgWhenExecuteQueueFull(Long taskId) { + return commonFormatMsgWhenExecuteQueueFull(taskId, "mtmv_task_queue_size", + "job_mtmv_task_consumer_thread_num"); + + } + @Override public TRow getTvfInfo() { TRow trow = new TRow(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index b1ccb976443..cc82b59a36a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -65,8 +65,8 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> { private static final int DISPATCH_MTMV_THREAD_NUM = Config.job_mtmv_task_consumer_thread_num > 0 ? Config.job_mtmv_task_consumer_thread_num : DEFAULT_CONSUMER_THREAD_NUM; - private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE; - private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE; + private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = normalizeRingbufferSize(Config.insert_task_queue_size); + private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE = normalizeRingbufferSize(Config.mtmv_task_queue_size); public void init() { @@ -133,4 +133,24 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> { } + /** + * Normalizes the given size to the nearest power of two. + * This method ensures that the size is a power of two, which is often required for optimal + * performance in certain data structures like ring buffers. + * + * @param size The input size to be normalized. + * @return The nearest power of two greater than or equal to the input size. + */ + public static int normalizeRingbufferSize(int size) { + int ringBufferSize = size - 1; + if (size < 1) { + return DEFAULT_RING_BUFFER_SIZE; + } + ringBufferSize |= ringBufferSize >>> 1; + ringBufferSize |= ringBufferSize >>> 2; + ringBufferSize |= ringBufferSize >>> 4; + ringBufferSize |= ringBufferSize >>> 8; + ringBufferSize |= ringBufferSize >>> 16; + return ringBufferSize + 1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 862b85597cd..ea0c263a5ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -168,8 +168,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { for (AbstractTask task : tasks) { if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), job.getJobConfig())) { - throw new JobException("dispatch instant task failed, job id is " - + job.getJobId() + ", task id is " + task.getTaskId()); + throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId())); + } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), job.getJobName(), task.getTaskId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java new file mode 100644 index 00000000000..661bdcdcca2 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.manager; + + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TaskDisruptorGroupManagerTest { + + @Test + public void testInit() { + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(1024)); + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(-1)); + Assertions.assertEquals(16, TaskDisruptorGroupManager.normalizeRingbufferSize(15)); + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(1023)); + Assertions.assertEquals(1024, TaskDisruptorGroupManager.normalizeRingbufferSize(-8)); + Assertions.assertEquals(2048, TaskDisruptorGroupManager.normalizeRingbufferSize(1025)); + Assertions.assertEquals(4096, TaskDisruptorGroupManager.normalizeRingbufferSize(2049)); + } +} diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index ce965f7f996..48cc9598ef4 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -67,6 +67,7 @@ label_keep_max_second = 300 # job test configurations #allows the creation of jobs with an interval of second enable_job_schedule_second_for_test = true +mtmv_task_queue_size = 4096 enable_workload_group = true publish_topic_info_interval_ms = 1000 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org