This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f9ea8f8229e [Chore](job) Provides configuration of job execution queue
size (#42253)
f9ea8f8229e is described below
commit f9ea8f8229e9f5514c1773bd25c3cc11985c63fb
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Oct 23 14:23:19 2024 +0800
[Chore](job) Provides configuration of job execution queue size (#42253)
When dealing with a large number of tasks, the default execution queue
size is 1024. This can lead to tasks being dropped if the queue becomes
full.
eg
`dispatch instant task failed, job id is xxx`
To address this, you can add the parameters `insert_task_queue_size` and
`mtmv_task_queue_size` in the `fe.conf` configuration file. These
parameters must be set to a power of 2.
**Keep in mind, increasing this value is recommended only when thread
resources are limited; otherwise, you should consider increasing the
number of task execution threads.**
---
.../main/java/org/apache/doris/common/Config.java | 26 +++++++++++-----
.../org/apache/doris/job/base/AbstractJob.java | 19 +++++++++++-
.../main/java/org/apache/doris/job/base/Job.java | 7 +++++
.../doris/job/executor/TimerJobSchedulerTask.java | 10 ++++--
.../doris/job/extensions/insert/InsertJob.java | 6 ++++
.../apache/doris/job/extensions/mtmv/MTMVJob.java | 7 +++++
.../job/manager/TaskDisruptorGroupManager.java | 24 +++++++++++++--
.../apache/doris/job/scheduler/JobScheduler.java | 4 +--
.../job/manager/TaskDisruptorGroupManagerTest.java | 36 ++++++++++++++++++++++
regression-test/pipeline/p0/conf/fe.conf | 1 +
10 files changed, 125 insertions(+), 15 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index fd8e2ad87bc..97bcb39403a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1759,7 +1759,7 @@ public class Config extends ConfigBase {
* corresponding type of job
* The value should be greater than 0, if it is 0 or <=0, set it to 5
*/
- @ConfField(description = {"用于分发定时任务的线程数",
+ @ConfField(masterOnly = true, description = {"用于分发定时任务的线程数",
"The number of threads used to dispatch timer job."})
public static int job_dispatch_timer_job_thread_num = 2;
@@ -1770,24 +1770,34 @@ public class Config extends ConfigBase {
* {@code @dispatch_timer_job_thread_num}
* The value should be greater than 0, if it is 0 or <=0, set it to 1024
*/
- @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs
that can be queued."})
+ @ConfField(masterOnly = true, description = {"任务堆积时用于存放定时任务的队列大小", "The
number of timer jobs that can be queued."})
public static int job_dispatch_timer_job_queue_size = 1024;
- @ConfField(description = {"一个 Job 的 task 最大的持久化数量,超过这个限制将会丢弃旧的 task 记录,
如果值 < 1, 将不会持久化。",
+ @ConfField(masterOnly = true, description = {"一个 Job 的 task
最大的持久化数量,超过这个限制将会丢弃旧的 task 记录, 如果值 < 1, 将不会持久化。",
"Maximum number of persistence allowed per task in a job,exceeding
which old tasks will be discarded,"
+ "If the value is less than 1, it will not be persisted."
})
public static int max_persistence_task_count = 100;
- @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时",
+ @ConfField(masterOnly = true, description = {"MV task 的等待队列大小,如果是负数,则会使用
1024,如果不是 2 的幂,则会自动选择一个最接近的"
+ + " 2 的幂次方数", "The size of the MV task's waiting queue If the size
is negative, 1024 will be used. If "
+ + "the size is not a power of two, the nearest power of the size
will be"
+ + " automatically selected."})
+ public static int mtmv_task_queue_size = 1024;
+ @ConfField(masterOnly = true, description = {"Insert task
的等待队列大小,如果是负数,则会使用 1024,如果不是 2 的幂,则会自动选择一个最接近"
+ + " 的 2 的幂次方数", "The size of the Insert task's waiting queue If
the size is negative, 1024 will be used."
+ + " If the size is not a power of two, the nearest power of the
size will "
+ + "be automatically selected."})
+ public static int insert_task_queue_size = 1024;
+ @ConfField(masterOnly = true, description = {"finished 状态的 job
最长保存时间,超过这个时间将会被删除, 单位:小时",
"The longest time to save the job in finished status, it will be
deleted after this time. Unit: hour"})
public static int finished_job_cleanup_threshold_time_hour = 24;
- @ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5",
+ @ConfField(masterOnly = true, description = {"用于执行 Insert
任务的线程数,值应该大于0,否则默认为10",
"The number of threads used to consume Insert tasks, "
- + "the value should be greater than 0, if it is <=0,
default is 5."})
+ + "the value should be greater than 0, if it is <=0,
default is 10."})
public static int job_insert_task_consumer_thread_num = 10;
- @ConfField(description = {"用于执行 MTMV 任务的线程数,值应该大于0,否则默认为5",
+ @ConfField(masterOnly = true, description = {"用于执行 MTMV
任务的线程数,值应该大于0,否则默认为10",
"The number of threads used to consume mtmv tasks, "
- + "the value should be greater than 0, if it is <=0,
default is 5."})
+ + "the value should be greater than 0, if it is <=0,
default is 10."})
public static int job_mtmv_task_consumer_thread_num = 10;
/* job test config */
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 94a0b0146cd..62ac0c4d59d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -63,7 +63,7 @@ public abstract class AbstractJob<T extends AbstractTask, C>
implements Job<T, C
new Column("SucceedTaskCount", ScalarType.createStringType()),
new Column("FailedTaskCount", ScalarType.createStringType()),
new Column("CanceledTaskCount", ScalarType.createStringType())
- );
+ );
@SerializedName(value = "jid")
private Long jobId;
@@ -415,6 +415,23 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
return getCommonTvfInfo();
}
+ /**
+ * Generates a common error message when the execution queue is full.
+ *
+ * @param taskId The ID of the task.
+ * @param queueConfigName The name of the queue configuration.
+ * @param executeThreadConfigName The name of the execution thread
configuration.
+ * @return A formatted error message.
+ */
+ protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String
queueConfigName,
+ String
executeThreadConfigName) {
+ return String.format("Dispatch task failed, jobId: %d, jobName: %s,
taskId: %d, the queue size is full, "
+ + "you can increase the queue size by setting the
property "
+ + "%s in the fe.conf file or increase the value of "
+ + "the property %s in the fe.conf file", getJobId(),
getJobName(), taskId, queueConfigName,
+ executeThreadConfigName);
+ }
+
@Override
public ShowResultSetMetaData getJobMetaData() {
ShowResultSetMetaData.Builder builder =
ShowResultSetMetaData.builder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index 1124e7f2d28..a7e75554c71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -153,4 +153,11 @@ public interface Job<T extends AbstractTask, C> {
* @return TRow
*/
TRow getTvfInfo();
+
+ /**
+ * Generates a common error message when the execution queue is full.
+ * @param taskId task id
+ * @return error msg for user
+ */
+ String formatMsgWhenExecuteQueueFull(Long taskId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 25bbccf3fa2..65a9cf2e091 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -45,11 +45,17 @@ public class TimerJobSchedulerTask<T extends AbstractJob>
implements TimerTask {
return;
}
if (!dispatchDisruptor.publishEvent(this.job)) {
- log.warn("dispatch timer job failed, job id is {}, job name is
{}",
- this.job.getJobId(), this.job.getJobName());
+ log.warn("dispatch timer job failed, queue maybe full. job id
is {}, job name is {}",
+ this.job.getJobId(), this.job.getJobName() +
getMsgWhenExecuteQueueFull());
}
} catch (Exception e) {
log.warn("dispatch timer job error, task id is {}",
this.job.getJobId(), e);
}
}
+
+ private String getMsgWhenExecuteQueueFull() {
+ return "you can increase the queue size by setting the property "
+ + "job_dispatch_timer_job_queue_size in the fe.conf file or
increase the value of "
+ + "the property job_dispatch_timer_job_thread_num in the
fe.conf file";
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 487591efc04..ce35227feb1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -522,6 +522,12 @@ public class InsertJob extends AbstractJob<InsertTask,
Map<Object, Object>> impl
}
}
+ @Override
+ public String formatMsgWhenExecuteQueueFull(Long taskId) {
+ return commonFormatMsgWhenExecuteQueueFull(taskId,
"insert_task_queue_size",
+ "job_insert_task_consumer_thread_num");
+ }
+
private String getPriority() {
return properties.getOrDefault(LoadStmt.PRIORITY,
Priority.NORMAL.name());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index add191001f9..62c005ca280 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -218,6 +218,13 @@ public class MTMVJob extends AbstractJob<MTMVTask,
MTMVTaskContext> {
return data;
}
+ @Override
+ public String formatMsgWhenExecuteQueueFull(Long taskId) {
+ return commonFormatMsgWhenExecuteQueueFull(taskId,
"mtmv_task_queue_size",
+ "job_mtmv_task_consumer_thread_num");
+
+ }
+
@Override
public TRow getTvfInfo() {
TRow trow = new TRow();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index b1ccb976443..cc82b59a36a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -65,8 +65,8 @@ public class TaskDisruptorGroupManager<T extends
AbstractTask> {
private static final int DISPATCH_MTMV_THREAD_NUM =
Config.job_mtmv_task_consumer_thread_num > 0
? Config.job_mtmv_task_consumer_thread_num :
DEFAULT_CONSUMER_THREAD_NUM;
- private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE =
DEFAULT_RING_BUFFER_SIZE;
- private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE =
DEFAULT_RING_BUFFER_SIZE;
+ private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE =
normalizeRingbufferSize(Config.insert_task_queue_size);
+ private static final int DISPATCH_MTMV_TASK_QUEUE_SIZE =
normalizeRingbufferSize(Config.mtmv_task_queue_size);
public void init() {
@@ -133,4 +133,24 @@ public class TaskDisruptorGroupManager<T extends
AbstractTask> {
}
+ /**
+ * Normalizes the given size to the nearest power of two.
+ * This method ensures that the size is a power of two, which is often
required for optimal
+ * performance in certain data structures like ring buffers.
+ *
+ * @param size The input size to be normalized.
+ * @return The nearest power of two greater than or equal to the input
size.
+ */
+ public static int normalizeRingbufferSize(int size) {
+ int ringBufferSize = size - 1;
+ if (size < 1) {
+ return DEFAULT_RING_BUFFER_SIZE;
+ }
+ ringBufferSize |= ringBufferSize >>> 1;
+ ringBufferSize |= ringBufferSize >>> 2;
+ ringBufferSize |= ringBufferSize >>> 4;
+ ringBufferSize |= ringBufferSize >>> 8;
+ ringBufferSize |= ringBufferSize >>> 16;
+ return ringBufferSize + 1;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 862b85597cd..ea0c263a5ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -168,8 +168,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task,
job.getJobType(),
job.getJobConfig())) {
- throw new JobException("dispatch instant task failed, job id
is "
- + job.getJobId() + ", task id is " + task.getTaskId());
+ throw new
JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
+
}
log.info("dispatch instant job, job id is {}, job name is {}, task
id is {}", job.getJobId(),
job.getJobName(), task.getTaskId());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java
new file mode 100644
index 00000000000..661bdcdcca2
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/manager/TaskDisruptorGroupManagerTest.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.manager;
+
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TaskDisruptorGroupManagerTest {
+
+ @Test
+ public void testInit() {
+ Assertions.assertEquals(1024,
TaskDisruptorGroupManager.normalizeRingbufferSize(1024));
+ Assertions.assertEquals(1024,
TaskDisruptorGroupManager.normalizeRingbufferSize(-1));
+ Assertions.assertEquals(16,
TaskDisruptorGroupManager.normalizeRingbufferSize(15));
+ Assertions.assertEquals(1024,
TaskDisruptorGroupManager.normalizeRingbufferSize(1023));
+ Assertions.assertEquals(1024,
TaskDisruptorGroupManager.normalizeRingbufferSize(-8));
+ Assertions.assertEquals(2048,
TaskDisruptorGroupManager.normalizeRingbufferSize(1025));
+ Assertions.assertEquals(4096,
TaskDisruptorGroupManager.normalizeRingbufferSize(2049));
+ }
+}
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index e9448b34014..625012f9a65 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -67,6 +67,7 @@ label_keep_max_second = 300
# job test configurations
#allows the creation of jobs with an interval of second
enable_job_schedule_second_for_test = true
+mtmv_task_queue_size = 4096
enable_workload_group = true
publish_topic_info_interval_ms = 1000
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]