morningman commented on a change in pull request #7521:
URL: https://github.com/apache/incubator-doris/pull/7521#discussion_r785299056



##########
File path: be/src/olap/olap_server.cpp
##########
@@ -551,4 +537,51 @@ void 
StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
     }
 }
 
+Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, 
CompactionType compaction_type) {
+    bool already_exist = _push_tablet_into_submitted_compaction(tablet, 
compaction_type);
+    if (already_exist) {
+        return Status::InternalError(strings::Substitute(
+                "compaction task has already been submitted, tablet_id=$0, 
compaction_type=$1.",
+                tablet->tablet_id(), compaction_type));
+    }
+    int64_t permits =
+            tablet->prepare_compaction_and_calculate_permits(compaction_type, 
tablet);
+    if (permits > 0 && _permit_limiter.request(permits)) {
+        auto st = _compaction_thread_pool->submit_func([=]() {
+          CgroupsMgr::apply_system_cgroup();
+          tablet->execute_compaction(compaction_type);
+          _permit_limiter.release(permits);
+          // reset compaction
+          tablet->reset_compaction(compaction_type);
+          _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+        });
+        if (!st.ok()) {
+            _permit_limiter.release(permits);
+            // reset compaction
+            tablet->reset_compaction(compaction_type);
+            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+            return Status::InternalError(strings::Substitute(
+                    "failed to submit compaction task to thread pool, 
tablet_id=$0, compaction_type=$1.",
+                    tablet->tablet_id(), compaction_type));
+        }
+        return Status::OK();
+    } else {
+        // reset compaction
+        tablet->reset_compaction(compaction_type);
+        _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+        return Status::InternalError(strings::Substitute(
+                "failed to prepare compaction task and calculate permits, 
tablet_id=$0, compaction_type=$1.",
+                tablet->tablet_id(), compaction_type));
+    }
+}
+
+Status StorageEngine::submit_table_compaction_task(TabletSharedPtr tablet, 
CompactionType compaction_type) {

Review comment:
       ```suggestion
   Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, 
CompactionType compaction_type) {
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -7257,4 +7259,46 @@ public static boolean isStoredTableNamesLowerCase() {
     public static boolean isTableNamesCaseInsensitive() {
         return GlobalVariable.lowerCaseTableNames == 2;
     }
+
+    public void compactTable(AdminCompactTableStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTblName();
+
+        String type = stmt.getCompactionType();
+        if (type == null || (!type.equals("base") && 
!type.equals("cumulative"))) {
+            throw new DdlException("compaction type should be [BASE] or 
[CUMULATIVE]");
+        }
+
+        Database db = this.getDbOrDdlException(dbName);
+        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
+
+        olapTable.writeLock();

Review comment:
       readLock is enough

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -7257,4 +7259,46 @@ public static boolean isStoredTableNamesLowerCase() {
     public static boolean isTableNamesCaseInsensitive() {
         return GlobalVariable.lowerCaseTableNames == 2;
     }
+
+    public void compactTable(AdminCompactTableStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTblName();
+
+        String type = stmt.getCompactionType();
+        if (type == null || (!type.equals("base") && 
!type.equals("cumulative"))) {

Review comment:
       This check should be done in analysis phase

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -7257,4 +7259,46 @@ public static boolean isStoredTableNamesLowerCase() {
     public static boolean isTableNamesCaseInsensitive() {
         return GlobalVariable.lowerCaseTableNames == 2;
     }
+
+    public void compactTable(AdminCompactTableStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTblName();
+
+        String type = stmt.getCompactionType();
+        if (type == null || (!type.equals("base") && 
!type.equals("cumulative"))) {
+            throw new DdlException("compaction type should be [BASE] or 
[CUMULATIVE]");
+        }
+
+        Database db = this.getDbOrDdlException(dbName);
+        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
+
+        olapTable.writeLock();
+        try {
+            AgentBatchTask batchTask = new AgentBatchTask();
+            List<String> partitionNames = stmt.getPartitions();
+            LOG.info("Table compaction. database: {}, table: {}, partition: 
{}, type: {}", dbName, tableName,
+                    Joiner.on(", ").join(partitionNames), type);
+            for (String parName : partitionNames) {
+                Partition partition = olapTable.getPartition(parName);
+                if (partition == null) {
+                    throw new DdlException("partition[" + parName + "] not 
exist in table[" + tableName + "]");
+                }
+
+                for (MaterializedIndex idx : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    for (Tablet tablet : idx.getTablets()) {
+                        for (Replica replica : tablet.getReplicas()) {
+                            CompactionTask compactionTask = new 
CompactionTask(replica.getBackendId(), db.getId(),
+                                    olapTable.getId(), partition.getId(), 
idx.getId(), tablet.getId(),
+                                    
olapTable.getSchemaHashByIndexId(idx.getId()), type, 5000);
+                            batchTask.addTask(compactionTask);
+                        }
+                    }
+                } // indices
+            }
+            // send task immediately
+            AgentTaskExecutor.submit(batchTask);

Review comment:
       submit task outside the lock

##########
File path: be/src/agent/task_worker_pool.cpp
##########
@@ -1650,4 +1654,73 @@ void TaskWorkerPool::_random_sleep(int second) {
     sleep(rnd.Uniform(second) + 1);
 }
 
+void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        TCompactionReq compaction_req;
+
+        {
+            lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
+            while (_is_work && _tasks.empty()) {
+                _worker_thread_condition_variable.wait();
+            }
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            compaction_req = agent_task_req.compaction_req;
+            _tasks.pop_front();
+        }
+
+        LOG(INFO) << "get compaction task. signature:" << 
agent_task_req.signature
+                  << ", compaction type:" << compaction_req.type;
+
+        CompactionType compaction_type;
+        if (compaction_req.type == "base") {
+            compaction_type = CompactionType::BASE_COMPACTION;
+        } else {
+            compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+        }
+
+        TabletSharedPtr tablet_ptr = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+                compaction_req.tablet_id, compaction_req.schema_hash);
+        if (tablet_ptr != nullptr) {
+            auto data_dir = tablet_ptr->data_dir();
+            if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), 
compaction_type)) {
+                LOG(WARNING) << "can not do compaction: " << 
tablet_ptr->tablet_id()
+                             << ", compaction type: " << compaction_type;
+                _remove_task_info(agent_task_req.task_type, 
agent_task_req.signature);
+                continue;
+            }
+
+            if (compaction_type == CompactionType::BASE_COMPACTION) {

Review comment:
       Is it necessary to check lock here?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -7257,4 +7259,46 @@ public static boolean isStoredTableNamesLowerCase() {
     public static boolean isTableNamesCaseInsensitive() {
         return GlobalVariable.lowerCaseTableNames == 2;
     }
+
+    public void compactTable(AdminCompactTableStmt stmt) throws DdlException {
+        String dbName = stmt.getDbName();
+        String tableName = stmt.getTblName();
+
+        String type = stmt.getCompactionType();
+        if (type == null || (!type.equals("base") && 
!type.equals("cumulative"))) {
+            throw new DdlException("compaction type should be [BASE] or 
[CUMULATIVE]");
+        }
+
+        Database db = this.getDbOrDdlException(dbName);
+        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
+
+        olapTable.writeLock();
+        try {
+            AgentBatchTask batchTask = new AgentBatchTask();
+            List<String> partitionNames = stmt.getPartitions();
+            LOG.info("Table compaction. database: {}, table: {}, partition: 
{}, type: {}", dbName, tableName,
+                    Joiner.on(", ").join(partitionNames), type);
+            for (String parName : partitionNames) {
+                Partition partition = olapTable.getPartition(parName);
+                if (partition == null) {
+                    throw new DdlException("partition[" + parName + "] not 
exist in table[" + tableName + "]");
+                }
+
+                for (MaterializedIndex idx : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                    for (Tablet tablet : idx.getTablets()) {
+                        for (Replica replica : tablet.getReplicas()) {
+                            CompactionTask compactionTask = new 
CompactionTask(replica.getBackendId(), db.getId(),
+                                    olapTable.getId(), partition.getId(), 
idx.getId(), tablet.getId(),
+                                    
olapTable.getSchemaHashByIndexId(idx.getId()), type, 5000);
+                            batchTask.addTask(compactionTask);
+                        }
+                    }
+                } // indices
+            }
+            // send task immediately
+            AgentTaskExecutor.submit(batchTask);

Review comment:
       Timeout for 5000 seconds?
   I think the timeout can be calculated by BE.

##########
File path: docs/en/sql-reference/sql-statements/Administration/ADMIN COMPACT.md
##########
@@ -0,0 +1,52 @@
+---

Review comment:
       New doc need to be added to the `docs/.vuepress/sidebar/en.js` and 
`zh-CN.js`

##########
File path: gensrc/thrift/AgentService.thrift
##########
@@ -160,6 +160,13 @@ struct TCloneReq {
     10: optional i32 timeout_s;
 }
 
+struct TCompactionReq {
+    1: required Types.TTabletId tablet_id

Review comment:
       use optional for all fields




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to