ylcn91 commented on code in PR #26359:
URL: https://github.com/apache/doris/pull/26359#discussion_r1382345896


##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();

Review Comment:
   I think it's good practice to make "statusMap" final since it's not intended 
to be changed after initialization.



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {

Review Comment:
   This while loop could be refactored into a separate method to handle retries 
for better readability.
   Also, we are not handling the case where we exit the loop because the 
timeout is reached.
   
   Can we consider using a ScheduledExecutorService for better handling of 
repeated tasks with delays?



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+                try {
+                    Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                            .getWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                    response = future.get();
+                } catch (Exception e) {
+                    LOG.warn("encounter exception while getting wal queue size 
on backend id: " + backend.getId()
+                            + ",exception:" + e);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    String msg = "get wal queue size fail,backend id: " + 
backend.getId() + ", status: "
+                            + response.getStatus();
+                    LOG.warn(msg);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                size = response.getSize();
+                done = true;
+            }
+            if (size > 0) {
+                LOG.info("backend id:" + backend.getId() + ",wal size:" + 
size);
+                empty = false;
+            }
+        }
+        return empty;
+    }
+

Review Comment:
   This method shares a lot of logic with isPreviousWalFinished and could 
benefit from a common retry mechanism.



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * cal wal size of specific be
+ * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2...
+ * return:
+ * {
+ * "msg": "OK",
+ * "code": 0,
+ * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"],
+ * "count": 0
+ * }
+ */
+
+@RestController
+public class CheckWalSizeAction extends RestBaseController {
+    public static final String HOST_PORTS = "host_ports";
+
+    @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET)
+    public Object execute(HttpServletRequest request, HttpServletResponse 
response) {
+        // check user auth
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.OPERATOR);
+
+        String hostPorts = request.getParameter(HOST_PORTS);
+        if (Strings.isNullOrEmpty(hostPorts)) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        String[] hostPortArr = hostPorts.split(",");
+        if (hostPortArr.length == 0) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");

Review Comment:
   The string "No host:port specified" is used twice, which makes it a 
candidate for a constant to avoid duplication and facilitate changes.



##########
fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java:
##########
@@ -779,13 +796,28 @@ private void replayCreateJob(SchemaChangeJobV2 
replayedJob) throws MetaNotFoundE
 
             // set table state
             olapTable.setState(OlapTableState.SCHEMA_CHANGE);
+            Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, 
SchemaChangeStatus.BLOCK);
         } finally {
             olapTable.writeUnlock();
         }
 
         this.watershedTxnId = replayedJob.watershedTxnId;
         jobState = JobState.WAITING_TXN;
         LOG.info("replay pending schema change job: {}, table id: {}", jobId, 
tableId);
+        // wait wal done here
+        List<Long> aliveBeIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+        boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
+                .isPreviousWalFinished(tableId, watershedTxnId, aliveBeIds);
+        while (!walFinished) {
+            LOG.info("wai for wal queue size to be empty");
+            walFinished = Env.getCurrentEnv().getGroupCommitManager()
+                    .isPreviousWalFinished(tableId, watershedTxnId, 
aliveBeIds);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException ie) {
+                LOG.info("schema change job sleep wait InterruptedException: 
", ie);

Review Comment:
   It's good practice to re-interrupt the current thread when it catches an 
InterruptedException.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -1806,16 +1806,37 @@ private void handleInsertStmt() throws Exception {
             label = context.getTxnEntry().getLabel();
             txnId = context.getTxnEntry().getTxnConf().getTxnId();
         } else if (insertStmt instanceof NativeInsertStmt && 
((NativeInsertStmt) insertStmt).isGroupCommit()) {
+            while 
(Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId()))
 {
+                LOG.info("insert table " + insertStmt.getTargetTable().getId() 
+ " is blocked");

Review Comment:
    LOG.info("insert table {} is blocked", insertStmt.getTargetTable().getId());



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * cal wal size of specific be
+ * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2...
+ * return:
+ * {
+ * "msg": "OK",
+ * "code": 0,
+ * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"],
+ * "count": 0
+ * }
+ */
+
+@RestController
+public class CheckWalSizeAction extends RestBaseController {
+    public static final String HOST_PORTS = "host_ports";
+
+    @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET)
+    public Object execute(HttpServletRequest request, HttpServletResponse 
response) {
+        // check user auth
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.OPERATOR);
+
+        String hostPorts = request.getParameter(HOST_PORTS);
+        if (Strings.isNullOrEmpty(hostPorts)) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        String[] hostPortArr = hostPorts.split(",");
+        if (hostPortArr.length == 0) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        List<HostInfo> hostInfos = Lists.newArrayList();
+        for (String hostPort : hostPortArr) {
+            try {
+                HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort);
+                hostInfos.add(hostInfo);
+            } catch (AnalysisException e) {
+                return ResponseEntityBuilder.badRequest(e.getMessage());

Review Comment:
   Immediate return inside a loop will prevent processing further hosts if one 
fails.Consider aggregating all errors and returning a comprehensive response.



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {

Review Comment:
   Using "synchronized" on the method level is a simple locking mechanism but 
can be a performance bottleneck.
   Consider using ConcurrentHashMap or other concurrent collections for better 
scalability.



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);

Review Comment:
   Logging here can help trace the status changes which can be critical for 
debugging.
   -> LOG.debug("Setting status for tableId {}: {}", tableId, status);



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+                try {
+                    Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                            .getWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                    response = future.get();
+                } catch (Exception e) {
+                    LOG.warn("encounter exception while getting wal queue size 
on backend id: " + backend.getId()
+                            + ",exception:" + e);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    String msg = "get wal queue size fail,backend id: " + 
backend.getId() + ", status: "
+                            + response.getStatus();
+                    LOG.warn(msg);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                size = response.getSize();
+                done = true;
+            }
+            if (size > 0) {
+                LOG.info("backend id:" + backend.getId() + ",wal size:" + 
size);
+                empty = false;
+            }
+        }
+        return empty;
+    }
+
+    public long getAllWalQueueSize(Backend backend) {
+        PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
+                .build();
+        PGetWalQueueSizeResponse response = null;
+        long start = System.currentTimeMillis();
+        boolean done = false;
+        long size = 0;
+        while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+            try {
+                Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                        .getAllWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                response = future.get();
+            } catch (Exception e) {
+                LOG.warn("encounter exception while getting all wal queue size 
on backend id: " + backend.getId()
+                        + ",exception:" + e);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                }
+                continue;
+            }
+            TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                String msg = "get all wal queue size fail,backend id: " + 
backend.getId() + ", status: "
+                        + response.getStatus();
+                LOG.warn(msg);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                }
+                continue;
+            }
+            size = response.getSize();
+            done = true;
+        }
+        if (size > 0) {
+            LOG.info("backend id:" + backend.getId() + ",all wal size:" + 
size);
+        }
+        return size;
+    }
+

Review Comment:
   Consider the use of Optional<TransactionState> to avoid returning null and 
provide a more functional approach.



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+                try {
+                    Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                            .getWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                    response = future.get();
+                } catch (Exception e) {
+                    LOG.warn("encounter exception while getting wal queue size 
on backend id: " + backend.getId()
+                            + ",exception:" + e);
+                    try {
+                        Thread.sleep(100);

Review Comment:
   consider a backoff strategy instead of a fixed sleep.



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * cal wal size of specific be
+ * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2...
+ * return:
+ * {
+ * "msg": "OK",
+ * "code": 0,
+ * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"],
+ * "count": 0
+ * }
+ */
+
+@RestController
+public class CheckWalSizeAction extends RestBaseController {
+    public static final String HOST_PORTS = "host_ports";
+
+    @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET)
+    public Object execute(HttpServletRequest request, HttpServletResponse 
response) {
+        // check user auth
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.OPERATOR);
+
+        String hostPorts = request.getParameter(HOST_PORTS);
+        if (Strings.isNullOrEmpty(hostPorts)) {

Review Comment:
   Using Guava's Strings utility class for checking null or empty strings is 
fine,
   but Spring has built-in mechanisms for request parameter validation that 
could be used here.



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+                try {
+                    Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                            .getWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                    response = future.get();
+                } catch (Exception e) {
+                    LOG.warn("encounter exception while getting wal queue size 
on backend id: " + backend.getId()
+                            + ",exception:" + e);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    String msg = "get wal queue size fail,backend id: " + 
backend.getId() + ", status: "

Review Comment:
   This could be logged at a different level than warn depending on the 
severity of this situation.
   LOG.warn("Failed to get WAL queue size, backend id: {}, status: {}", 
backend.getId(), response.getStatus());
   Consider using exponential backoff or a configurable delay before retrying.



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * cal wal size of specific be
+ * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2...
+ * return:
+ * {
+ * "msg": "OK",
+ * "code": 0,
+ * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"],
+ * "count": 0
+ * }
+ */
+
+@RestController
+public class CheckWalSizeAction extends RestBaseController {
+    public static final String HOST_PORTS = "host_ports";
+
+    @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET)
+    public Object execute(HttpServletRequest request, HttpServletResponse 
response) {
+        // check user auth
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.OPERATOR);
+
+        String hostPorts = request.getParameter(HOST_PORTS);
+        if (Strings.isNullOrEmpty(hostPorts)) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        String[] hostPortArr = hostPorts.split(",");
+        if (hostPortArr.length == 0) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        List<HostInfo> hostInfos = Lists.newArrayList();
+        for (String hostPort : hostPortArr) {
+            try {
+                HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort);
+                hostInfos.add(hostInfo);
+            } catch (AnalysisException e) {
+                return ResponseEntityBuilder.badRequest(e.getMessage());
+            }
+        }
+
+        try {
+            List<Backend> backends = getBackends(hostInfos);
+            List<String> backendsList = new ArrayList<>();
+            for (Backend backend : backends) {

Review Comment:
   The WAL size calculation could potentially be parallelized, if it does not 
involve shared state or if it's a heavy operation.



##########
fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java:
##########
@@ -1593,6 +1594,8 @@ private void createJob(String rawSql, long dbId, 
OlapTable olapTable, Map<Long,
 
         // set table state
         olapTable.setState(OlapTableState.SCHEMA_CHANGE);
+        Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, 
SchemaChangeStatus.BLOCK);
+        LOG.info("block table:" + tableId);

Review Comment:
   Could be LOG.info("block table: {}", tableId); 



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java:
##########
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.rest;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.system.SystemInfoService.HostInfo;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * cal wal size of specific be
+ * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2...
+ * return:
+ * {
+ * "msg": "OK",
+ * "code": 0,
+ * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"],
+ * "count": 0
+ * }
+ */
+
+@RestController
+public class CheckWalSizeAction extends RestBaseController {
+    public static final String HOST_PORTS = "host_ports";
+
+    @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET)
+    public Object execute(HttpServletRequest request, HttpServletResponse 
response) {
+        // check user auth
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.OPERATOR);
+
+        String hostPorts = request.getParameter(HOST_PORTS);
+        if (Strings.isNullOrEmpty(hostPorts)) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        String[] hostPortArr = hostPorts.split(",");
+        if (hostPortArr.length == 0) {
+            return ResponseEntityBuilder.badRequest("No host:port specified");
+        }
+
+        List<HostInfo> hostInfos = Lists.newArrayList();
+        for (String hostPort : hostPortArr) {
+            try {
+                HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort);
+                hostInfos.add(hostInfo);
+            } catch (AnalysisException e) {
+                return ResponseEntityBuilder.badRequest(e.getMessage());
+            }
+        }
+
+        try {
+            List<Backend> backends = getBackends(hostInfos);
+            List<String> backendsList = new ArrayList<>();
+            for (Backend backend : backends) {
+                long size = Env.getCurrentEnv().getGroupCommitManager()
+                        .getAllWalQueueSize(backend);
+                backendsList.add(backend.getHost() + ":" + 
backend.getHeartbeatPort() + ":" + size);
+            }
+            return ResponseEntityBuilder.ok(backendsList);
+        } catch (DdlException e) {
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());

Review Comment:
   Can we think about differentiating between client errors (400s) and server 
errors (500s)?



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+                try {
+                    Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                            .getWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                    response = future.get();
+                } catch (Exception e) {
+                    LOG.warn("encounter exception while getting wal queue size 
on backend id: " + backend.getId()
+                            + ",exception:" + e);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    String msg = "get wal queue size fail,backend id: " + 
backend.getId() + ", status: "
+                            + response.getStatus();
+                    LOG.warn(msg);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                size = response.getSize();
+                done = true;
+            }
+            if (size > 0) {
+                LOG.info("backend id:" + backend.getId() + ",wal size:" + 
size);
+                empty = false;
+            }
+        }
+        return empty;
+    }
+
+    public long getAllWalQueueSize(Backend backend) {
+        PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
+                .build();
+        PGetWalQueueSizeResponse response = null;
+        long start = System.currentTimeMillis();
+        boolean done = false;
+        long size = 0;
+        while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+            try {
+                Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                        .getAllWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                response = future.get();
+            } catch (Exception e) {
+                LOG.warn("encounter exception while getting all wal queue size 
on backend id: " + backend.getId()
+                        + ",exception:" + e);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                }
+                continue;
+            }
+            TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                String msg = "get all wal queue size fail,backend id: " + 
backend.getId() + ", status: "

Review Comment:
   LOG.warn("get all wal queue size failed, backend id: {}, status: {}", 
backend.getId(), response.getStatus());
   



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
+import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public class GroupCommitManager {
+
+    public enum SchemaChangeStatus {
+        BLOCK, NORMAL
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(GroupCommitManager.class);
+
+    private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>();
+
+    public synchronized boolean isBlock(long tableId) {
+        if (statusMap.containsKey(tableId)) {
+            return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
+        }
+        return false;
+    }
+
+    public synchronized void setStatus(long tableId, SchemaChangeStatus 
status) {
+        statusMap.put(tableId, status);
+    }
+
+    /**
+     * Check the wal before the endTransactionId is finished or not.
+     */
+    public boolean isPreviousWalFinished(long tableId, long endTransactionId, 
List<Long> aliveBeIds) {
+        boolean empty = true;
+        for (int i = 0; i < aliveBeIds.size(); i++) {
+            Backend backend = 
Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i));
+            PGetWalQueueSizeRequest request = 
PGetWalQueueSizeRequest.newBuilder()
+                    .setTableId(tableId)
+                    .setTxnId(endTransactionId)
+                    .build();
+            PGetWalQueueSizeResponse response = null;
+            long start = System.currentTimeMillis();
+            boolean done = false;
+            long size = 0;
+            while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+                try {
+                    Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                            .getWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                    response = future.get();
+                } catch (Exception e) {
+                    LOG.warn("encounter exception while getting wal queue size 
on backend id: " + backend.getId()
+                            + ",exception:" + e);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+                if (code != TStatusCode.OK) {
+                    String msg = "get wal queue size fail,backend id: " + 
backend.getId() + ", status: "
+                            + response.getStatus();
+                    LOG.warn(msg);
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException ie) {
+                        LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                    }
+                    continue;
+                }
+                size = response.getSize();
+                done = true;
+            }
+            if (size > 0) {
+                LOG.info("backend id:" + backend.getId() + ",wal size:" + 
size);
+                empty = false;
+            }
+        }
+        return empty;
+    }
+
+    public long getAllWalQueueSize(Backend backend) {
+        PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder()
+                .build();
+        PGetWalQueueSizeResponse response = null;
+        long start = System.currentTimeMillis();
+        boolean done = false;
+        long size = 0;
+        while (!done && System.currentTimeMillis() - start <= 
Config.check_wal_queue_timeout_threshold) {
+            try {
+                Future<PGetWalQueueSizeResponse> future = 
BackendServiceProxy.getInstance()
+                        .getAllWalQueueSize(new 
TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request);
+                response = future.get();
+            } catch (Exception e) {
+                LOG.warn("encounter exception while getting all wal queue size 
on backend id: " + backend.getId()
+                        + ",exception:" + e);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                }
+                continue;
+            }
+            TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                String msg = "get all wal queue size fail,backend id: " + 
backend.getId() + ", status: "
+                        + response.getStatus();
+                LOG.warn(msg);
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ie) {
+                    LOG.info("group commit manager sleep wait 
InterruptedException: ", ie);
+                }
+                continue;
+            }
+            size = response.getSize();
+            done = true;
+        }
+        if (size > 0) {
+            LOG.info("backend id:" + backend.getId() + ",all wal size:" + 
size);

Review Comment:
   LOG.info("backend id: {}, all wal size: {}", backend.getId(), size);



##########
fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java:
##########
@@ -451,4 +453,29 @@ public Future<PGroupCommitInsertResponse> 
groupCommitInsert(TNetworkAddress addr
             throw new RpcException(address.hostname, e.getMessage());
         }
     }
+
+    public Future<PGetWalQueueSizeResponse> getWalQueueSize(TNetworkAddress 
address,
+            PGetWalQueueSizeRequest request) throws RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.getWalQueueSize(request);
+        } catch (Throwable e) {
+            LOG.warn("failed to get wal queue size from address={}:{}", 
address.getHostname(),
+                    address.getPort(), e);
+            throw new RpcException(address.hostname, e.getMessage());

Review Comment:
    consider including the original exception as the cause.



##########
fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java:
##########
@@ -158,6 +158,9 @@ private boolean isMatch(Backend backend) {
     }
 
     public List<Backend> getCandidateBackends(ImmutableCollection<Backend> 
backends) {
+        for (Backend backend : backends) {
+            LOG.info("candidate backends:" + backend.getId());

Review Comment:
   LOG.info("candidate backends: {}", backend.getId());



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to