This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a8669020af [fix](group commit) Fix some group commit case (#30132)
4a8669020af is described below

commit 4a8669020af171750e6c879eb5bac046be861c02
Author: meiyi <myime...@gmail.com>
AuthorDate: Sun Jan 21 11:42:25 2024 +0800

    [fix](group commit) Fix some group commit case (#30132)
---
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 15 ++++++-------
 .../java/org/apache/doris/alter/SystemHandler.java | 11 +++++-----
 .../apache/doris/analysis/NativeInsertStmt.java    |  2 +-
 .../doris/httpv2/rest/CheckWalSizeAction.java      |  3 +--
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 23 +++++++++-----------
 .../org/apache/doris/load/GroupCommitManager.java  | 25 +++++++++-------------
 .../apache/doris/planner/GroupCommitPlanner.java   |  3 ---
 gensrc/proto/internal_service.proto                |  6 +++---
 8 files changed, 38 insertions(+), 50 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index eef902dea82..5c74164ae33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -48,7 +48,6 @@ import org.apache.doris.common.SchemaVersionAndHash;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DbUtil;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTask;
@@ -602,8 +601,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
     private void waitWalFinished() {
         // wait wal done here
-        Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, 
SchemaChangeStatus.BLOCK);
-        LOG.info("block table {}", tableId);
+        Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
+        LOG.info("block group commit for table={} when schema change", 
tableId);
         List<Long> aliveBeIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
         long expireTime = System.currentTimeMillis() + 
Config.check_wal_queue_timeout_threshold;
         while (true) {
@@ -611,21 +610,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
                     .isPreviousWalFinished(tableId, aliveBeIds);
             if (walFinished) {
-                LOG.info("all wal is finished");
+                LOG.info("all wal is finished for table={}", tableId);
                 break;
             } else if (System.currentTimeMillis() > expireTime) {
-                LOG.warn("waitWalFinished time out");
+                LOG.warn("waitWalFinished time out for table={}", tableId);
                 break;
             } else {
                 try {
                     Thread.sleep(100);
                 } catch (InterruptedException ie) {
-                    LOG.info("schema change job sleep wait for wal 
InterruptedException: ", ie);
+                    LOG.warn("failed to wait for wal for table={} when schema 
change", tableId, ie);
                 }
             }
         }
-        Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, 
SchemaChangeStatus.NORMAL);
-        LOG.info("release table {}", tableId);
+        Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
+        LOG.info("unblock group commit for table={} when schema change", 
tableId);
     }
 
     private void onFinished(OlapTable tbl) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index 935dcf36293..86551ba0735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -83,7 +83,8 @@ public class SystemHandler extends AlterHandler {
             }
 
             List<Long> backendTabletIds = 
invertedIndex.getTabletIdsByBackendId(beId);
-            if (Config.drop_backend_after_decommission && checkTablets(beId, 
backendTabletIds) && checkWal(backend)) {
+            boolean hasWal = checkWal(backend);
+            if (Config.drop_backend_after_decommission && checkTablets(beId, 
backendTabletIds) && hasWal) {
                 try {
                     systemInfoService.dropBackend(beId);
                     LOG.info("no available tablet on decommission backend {}, 
drop it", beId);
@@ -94,8 +95,9 @@ public class SystemHandler extends AlterHandler {
                 continue;
             }
 
-            LOG.info("backend {} lefts {} replicas to decommission: {}", beId, 
backendTabletIds.size(),
-                    backendTabletIds.subList(0, Math.min(10, 
backendTabletIds.size())));
+            LOG.info("backend {} lefts {} replicas to decommission: {}{}", 
beId, backendTabletIds.size(),
+                    backendTabletIds.subList(0, Math.min(10, 
backendTabletIds.size())),
+                    hasWal ? "; and has unfinished WALs" : "");
         }
     }
 
@@ -197,8 +199,7 @@ public class SystemHandler extends AlterHandler {
     }
 
     private boolean checkWal(Backend backend) {
-        return Env.getCurrentEnv().getGroupCommitManager()
-                .getAllWalQueueSize(backend) == 0;
+        return 
Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend) == 0;
     }
 
     private List<Backend> checkDecommission(DecommissionBackendClause 
decommissionBackendClause)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index d69ca40ceca..96dbb2e0edf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1140,7 +1140,7 @@ public class NativeInsertStmt extends InsertStmt {
             return;
         }
         boolean partialUpdate = 
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
-        if (!partialUpdate && 
ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
+        if (!isExplain() && !partialUpdate && 
ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
                 && ConnectContext.get().getSessionVariable().getSqlMode() != 
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
                 && targetTable instanceof OlapTable
                 && ((OlapTable) 
targetTable).getTableProperty().getUseSchemaLightChange()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
index f7822580fb7..fdc39e8badd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
@@ -84,8 +84,7 @@ public class CheckWalSizeAction extends RestBaseController {
             List<Backend> backends = getBackends(hostInfos);
             List<String> backendsList = new ArrayList<>();
             for (Backend backend : backends) {
-                long size = Env.getCurrentEnv().getGroupCommitManager()
-                        .getAllWalQueueSize(backend);
+                long size = 
Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
                 backendsList.add(backend.getHost() + ":" + 
backend.getHeartbeatPort() + ":" + size);
             }
             return ResponseEntityBuilder.ok(backendsList);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index a78a7e9fa58..6952bd37b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -88,12 +88,11 @@ public class LoadAction extends RestBaseController {
                              @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
         boolean groupCommit = false;
         String groupCommitStr = request.getHeader("group_commit");
-        if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
+        if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
             groupCommit = true;
             try {
-                String[] pair = new String[] {db, table};
-                if (isGroupCommitBlock(pair)) {
-                    String msg = "insert table " + pair[1] + " is blocked on 
schema change";
+                if (isGroupCommitBlock(db, table)) {
+                    String msg = "insert table " + table + " is blocked on 
schema change";
                     return new RestBaseResult(msg);
                 }
             } catch (Exception e) {
@@ -122,19 +121,17 @@ public class LoadAction extends RestBaseController {
         }
     }
 
-    @RequestMapping(path = "/api/_http_stream",
-                        method = RequestMethod.PUT)
-    public Object streamLoadWithSql(HttpServletRequest request,
-                             HttpServletResponse response) {
+    @RequestMapping(path = "/api/_http_stream", method = RequestMethod.PUT)
+    public Object streamLoadWithSql(HttpServletRequest request, 
HttpServletResponse response) {
         String sql = request.getHeader("sql");
         LOG.info("streaming load sql={}", sql);
         boolean groupCommit = false;
         String groupCommitStr = request.getHeader("group_commit");
-        if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
+        if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
             groupCommit = true;
             try {
                 String[] pair = parseDbAndTb(sql);
-                if (isGroupCommitBlock(pair)) {
+                if (isGroupCommitBlock(pair[0], pair[1])) {
                     String msg = "insert table " + pair[1] + " is blocked on 
schema change";
                     return new RestBaseResult(msg);
                 }
@@ -164,11 +161,11 @@ public class LoadAction extends RestBaseController {
         }
     }
 
-    private boolean isGroupCommitBlock(String[] pair) throws TException {
-        String fullDbName = getFullDbName(pair[0]);
+    private boolean isGroupCommitBlock(String db, String table) throws 
TException {
+        String fullDbName = getFullDbName(db);
         Database dbObj = Env.getCurrentInternalCatalog()
                 .getDbOrException(fullDbName, s -> new TException("database is 
invalid for dbName: " + s));
-        Table tblObj = dbObj.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
+        Table tblObj = dbObj.getTableOrException(table, s -> new 
TException("table is invalid: " + s));
         return 
Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 3b9719b2594..12410945e9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.load;
 
-
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@@ -30,31 +29,27 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
 import java.util.concurrent.Future;
 
 public class GroupCommitManager {
 
-    public enum SchemaChangeStatus {
-        BLOCK, NORMAL
-    }
-
     private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
 
-    private final Map<Long, SchemaChangeStatus> statusMap = new 
ConcurrentHashMap<>();
+    private Set<Long> blockedTableIds = new HashSet<>();
 
     public boolean isBlock(long tableId) {
-        if (statusMap.containsKey(tableId)) {
-            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
-        }
-        return false;
+        return blockedTableIds.contains(tableId);
+    }
+
+    public void blockTable(long tableId) {
+        blockedTableIds.add(tableId);
     }
 
-    public void setStatus(long tableId, SchemaChangeStatus status) {
-        LOG.debug("Setting status for tableId {}: {}", tableId, status);
-        statusMap.put(tableId, status);
+    public void unblockTable(long tableId) {
+        blockedTableIds.remove(tableId);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 8b9f6b18331..b69ece3b9ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -153,9 +153,6 @@ public class GroupCommitPlanner {
             }
         }
         PGroupCommitInsertRequest request = 
PGroupCommitInsertRequest.newBuilder()
-                .setDbId(db.getId())
-                .setTableId(table.getId())
-                .setBaseSchemaVersion(table.getBaseSchemaVersion())
                 
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
                         .setRequest(execPlanFragmentParamsBytes)
                         
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 433144b304b..cf45d039522 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -766,11 +766,11 @@ message PGlobResponse {
 }
 
 message PGroupCommitInsertRequest {
-    optional int64 db_id = 1;
-    optional int64 table_id = 2;
+    optional int64 db_id = 1; // deprecated
+    optional int64 table_id = 2; // deprecated
     //  Descriptors.TDescriptorTable
     // optional bytes desc_tbl = 3;
-    optional int64 base_schema_version = 4;
+    optional int64 base_schema_version = 4; // deprecated
 
     // TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan
     // optional bytes plan_node = 5;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to