levy5307 commented on code in PR #17615:
URL: https://github.com/apache/doris/pull/17615#discussion_r1133047967


##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "task_group.h"
+#include "pipeline/pipeline_task.h"
+
+namespace doris {
+namespace taskgroup {
+
+pipeline::PipelineTask* TaskGroupEntity::take() {
+    if (_queue.empty()) {
+        return nullptr;
+    }
+    auto task = _queue.front();
+    _queue.pop();
+    return task;
+}
+
+void TaskGroupEntity::incr_runtime_ns(int64_t runtime_ns)  {
+    auto v_time = runtime_ns / _rs->cpu_share();
+    _vruntime_ns += v_time;
+}
+
+void TaskGroupEntity::adjust_vruntime_ns(int64_t vruntime_ns) {
+    _vruntime_ns = vruntime_ns;
+}
+
+void TaskGroupEntity::push_back(pipeline::PipelineTask* task) {
+    _queue.emplace(task);
+}
+
+int TaskGroupEntity::cpu_share() const {
+    return _rs->cpu_share();
+}
+
+TaskGroup::TaskGroup(uint64_t id, std::string name, int cpu_share)
+        : _id(id), _name(name), _cpu_share(cpu_share), _task_entry(this) {}
+
+} // namespace taskgroup
+} // namespace doris

Review Comment:
   Add a blank line



##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "task_group.h"
+#include "pipeline/pipeline_task.h"
+
+namespace doris {
+namespace taskgroup {
+
+pipeline::PipelineTask* TaskGroupEntity::take() {
+    if (_queue.empty()) {
+        return nullptr;
+    }
+    auto task = _queue.front();
+    _queue.pop();
+    return task;
+}
+
+void TaskGroupEntity::incr_runtime_ns(int64_t runtime_ns)  {

Review Comment:
   What about change all of these `int64_t` to `uint64_t` here?



##########
be/src/runtime/task_group/task_group.h:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include <queue>
+
+#include "olap/olap_define.h"
+
+namespace doris {
+
+namespace pipeline {
+class PipelineTask;
+}
+
+class QueryFragmentsCtx;
+
+namespace taskgroup {
+
+class TaskGroup;
+
+class TaskGroupEntity {
+public:
+    explicit TaskGroupEntity(taskgroup::TaskGroup* rs) : _rs(rs) {}
+    void push_back(pipeline::PipelineTask* task);

Review Comment:
   It’s better to use smart pointer instead of raw pointer



##########
be/src/runtime/task_group/task_group_manager.h:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <shared_mutex>
+#include <unordered_map>
+
+#include "task_group.h"
+
+namespace doris::taskgroup {
+
+class TaskGroupManager {
+    DECLARE_SINGLETON(TaskGroupManager)
+public:
+    // TODO pipeline task group
+    TaskGroupPtr get_or_create_task_group(uint64_t id);
+
+    static constexpr uint64_t DEFAULT_RG_ID = 0;

Review Comment:
   What about using `enum class` to define resource group id?



##########
be/src/runtime/task_group/task_group_manager.cpp:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "task_group_manager.h"
+
+namespace doris::taskgroup {
+
+TaskGroupManager::TaskGroupManager() {
+    _create_default_task_group();
+    _create_poc_task_group();
+}
+TaskGroupManager::~TaskGroupManager() = default;
+
+TaskGroupPtr TaskGroupManager::get_or_create_task_group(uint64_t id) {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    if (_task_groups.count(id)) {
+        return _task_groups[id];
+    } else {
+        return _task_groups[DEFAULT_RG_ID];
+    }
+}
+
+void TaskGroupManager::_create_default_task_group() {
+    _task_groups[DEFAULT_RG_ID] =
+            std::make_shared<TaskGroup>(DEFAULT_RG_ID, "default_rs", 
DEFAULT_CPU_SHARE);
+}
+
+void TaskGroupManager::_create_poc_task_group() {
+    _task_groups[POC_RG_ID] =
+            std::make_shared<TaskGroup>(POC_RG_ID, "poc_rs", POC_RG_CPU_SHARE);
+}
+
+}

Review Comment:
   Add a blank line after here



##########
be/src/runtime/task_group/task_group.h:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include <queue>
+
+#include "olap/olap_define.h"
+
+namespace doris {
+
+namespace pipeline {
+class PipelineTask;
+}
+
+class QueryFragmentsCtx;
+
+namespace taskgroup {
+
+class TaskGroup;
+
+class TaskGroupEntity {
+public:
+    explicit TaskGroupEntity(taskgroup::TaskGroup* rs) : _rs(rs) {}
+    void push_back(pipeline::PipelineTask* task);
+    int64_t vruntime_ns() const { return _vruntime_ns; }
+
+    pipeline::PipelineTask* take();
+
+    void incr_runtime_ns(int64_t runtime_ns);
+
+    void adjust_vruntime_ns(int64_t vruntime_ns);
+
+    size_t task_size() { return _queue.size(); }
+
+    int cpu_share() const;

Review Comment:
   same with func `task_size`



##########
be/src/runtime/task_group/task_group.h:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include <queue>
+
+#include "olap/olap_define.h"
+
+namespace doris {
+
+namespace pipeline {
+class PipelineTask;
+}
+
+class QueryFragmentsCtx;
+
+namespace taskgroup {
+
+class TaskGroup;
+
+class TaskGroupEntity {
+public:
+    explicit TaskGroupEntity(taskgroup::TaskGroup* rs) : _rs(rs) {}
+    void push_back(pipeline::PipelineTask* task);
+    int64_t vruntime_ns() const { return _vruntime_ns; }
+
+    pipeline::PipelineTask* take();
+
+    void incr_runtime_ns(int64_t runtime_ns);
+
+    void adjust_vruntime_ns(int64_t vruntime_ns);
+
+    size_t task_size() { return _queue.size(); }

Review Comment:
   ```
       size_t task_size() const { return _queue.size(); }
   ```
   



##########
be/src/runtime/task_group/task_group.h:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include <queue>
+
+#include "olap/olap_define.h"
+
+namespace doris {
+
+namespace pipeline {
+class PipelineTask;
+}
+
+class QueryFragmentsCtx;
+
+namespace taskgroup {
+
+class TaskGroup;
+
+class TaskGroupEntity {
+public:
+    explicit TaskGroupEntity(taskgroup::TaskGroup* rs) : _rs(rs) {}
+    void push_back(pipeline::PipelineTask* task);
+    int64_t vruntime_ns() const { return _vruntime_ns; }
+
+    pipeline::PipelineTask* take();
+
+    void incr_runtime_ns(int64_t runtime_ns);
+
+    void adjust_vruntime_ns(int64_t vruntime_ns);
+
+    size_t task_size() { return _queue.size(); }
+
+    int cpu_share() const;
+
+private:
+    // TODO pipeline use MLFQ
+    std::queue<pipeline::PipelineTask*> _queue;
+    taskgroup::TaskGroup* _rs;
+    int64_t _vruntime_ns = 0;

Review Comment:
   uint64_t



##########
be/src/runtime/task_group/task_group_manager.cpp:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "task_group_manager.h"
+
+namespace doris::taskgroup {
+
+TaskGroupManager::TaskGroupManager() {
+    _create_default_task_group();
+    _create_poc_task_group();
+}
+TaskGroupManager::~TaskGroupManager() = default;
+
+TaskGroupPtr TaskGroupManager::get_or_create_task_group(uint64_t id) {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    if (_task_groups.count(id)) {

Review Comment:
   ```
       if (_task_groups.count(id))
   ```
   ==>
   ```
       if (_task_groups.count(id) > 0)
   ```



##########
be/src/runtime/task_group/task_group_manager.h:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <shared_mutex>
+#include <unordered_map>
+
+#include "task_group.h"

Review Comment:
   ```
   using TaskGroupPtr = std::shared_ptr<TaskGroup>;
   ```
   Using this line instead. It's better to reduce the count of including files.



##########
be/src/runtime/task_group/task_group_manager.cpp:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "task_group_manager.h"
+
+namespace doris::taskgroup {
+
+TaskGroupManager::TaskGroupManager() {
+    _create_default_task_group();
+    _create_poc_task_group();
+}
+TaskGroupManager::~TaskGroupManager() = default;
+
+TaskGroupPtr TaskGroupManager::get_or_create_task_group(uint64_t id) {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    if (_task_groups.count(id)) {
+        return _task_groups[id];
+    } else {
+        return _task_groups[DEFAULT_RG_ID];
+    }
+}
+
+void TaskGroupManager::_create_default_task_group() {
+    _task_groups[DEFAULT_RG_ID] =
+            std::make_shared<TaskGroup>(DEFAULT_RG_ID, "default_rs", 
DEFAULT_CPU_SHARE);
+}
+
+void TaskGroupManager::_create_poc_task_group() {
+    _task_groups[POC_RG_ID] =
+            std::make_shared<TaskGroup>(POC_RG_ID, "poc_rs", POC_RG_CPU_SHARE);
+}
+
+}

Review Comment:
   ```// namespace doris::taskgroup```



##########
be/src/runtime/task_group/task_group_manager.cpp:
##########
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "task_group_manager.h"
+
+namespace doris::taskgroup {
+
+TaskGroupManager::TaskGroupManager() {
+    _create_default_task_group();
+    _create_poc_task_group();
+}
+TaskGroupManager::~TaskGroupManager() = default;
+
+TaskGroupPtr TaskGroupManager::get_or_create_task_group(uint64_t id) {

Review Comment:
   The name of this function is misleading, because you didn't create task 
group here.



##########
be/src/runtime/task_group/task_group.h:
##########
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include <queue>
+
+#include "olap/olap_define.h"
+
+namespace doris {
+
+namespace pipeline {
+class PipelineTask;
+}
+
+class QueryFragmentsCtx;
+
+namespace taskgroup {
+
+class TaskGroup;
+
+class TaskGroupEntity {

Review Comment:
   +1



##########
be/src/runtime/query_fragments_ctx.h:
##########
@@ -122,6 +123,10 @@ class QueryFragmentsCtx {
 
     vectorized::RuntimePredicate& get_runtime_predicate() { return 
_runtime_predicate; }
 
+    void set_task_group(taskgroup::TaskGroupPtr& rs_group) { _task_group = 
rs_group; }
+
+    taskgroup::TaskGroup* get_task_group() { return _task_group.get(); }

Review Comment:
   What about return a `weak_ptr`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to