ylcn91 commented on code in PR #26359: URL: https://github.com/apache/doris/pull/26359#discussion_r1382345896
########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); Review Comment: I think it's good practice to make "statusMap" final since it's not intended to be changed after initialization. ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { Review Comment: This while loop could be refactored into a separate method to handle retries for better readability. Also, we are not handling the case where we exit the loop because the timeout is reached. Can we consider using a ScheduledExecutorService for better handling of repeated tasks with delays? ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get wal queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + done = true; + } + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",wal size:" + size); + empty = false; + } + } + return empty; + } + Review Comment: This method shares a lot of logic with isPreviousWalFinished and could benefit from a common retry mechanism. ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java: ########## @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * cal wal size of specific be + * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2... + * return: + * { + * "msg": "OK", + * "code": 0, + * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"], + * "count": 0 + * } + */ + +@RestController +public class CheckWalSizeAction extends RestBaseController { + public static final String HOST_PORTS = "host_ports"; + + @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET) + public Object execute(HttpServletRequest request, HttpServletResponse response) { + // check user auth + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR); + + String hostPorts = request.getParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + return ResponseEntityBuilder.badRequest("No host:port specified"); Review Comment: The string "No host:port specified" is used twice, which makes it a candidate for a constant to avoid duplication and facilitate changes. ########## fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java: ########## @@ -779,13 +796,28 @@ private void replayCreateJob(SchemaChangeJobV2 replayedJob) throws MetaNotFoundE // set table state olapTable.setState(OlapTableState.SCHEMA_CHANGE); + Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK); } finally { olapTable.writeUnlock(); } this.watershedTxnId = replayedJob.watershedTxnId; jobState = JobState.WAITING_TXN; LOG.info("replay pending schema change job: {}, table id: {}", jobId, tableId); + // wait wal done here + List<Long> aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + boolean walFinished = Env.getCurrentEnv().getGroupCommitManager() + .isPreviousWalFinished(tableId, watershedTxnId, aliveBeIds); + while (!walFinished) { + LOG.info("wai for wal queue size to be empty"); + walFinished = Env.getCurrentEnv().getGroupCommitManager() + .isPreviousWalFinished(tableId, watershedTxnId, aliveBeIds); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("schema change job sleep wait InterruptedException: ", ie); Review Comment: It's good practice to re-interrupt the current thread when it catches an InterruptedException. ########## fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java: ########## @@ -1806,16 +1806,37 @@ private void handleInsertStmt() throws Exception { label = context.getTxnEntry().getLabel(); txnId = context.getTxnEntry().getTxnConf().getTxnId(); } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) { + while (Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId())) { + LOG.info("insert table " + insertStmt.getTargetTable().getId() + " is blocked"); Review Comment: LOG.info("insert table {} is blocked", insertStmt.getTargetTable().getId()); ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java: ########## @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * cal wal size of specific be + * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2... + * return: + * { + * "msg": "OK", + * "code": 0, + * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"], + * "count": 0 + * } + */ + +@RestController +public class CheckWalSizeAction extends RestBaseController { + public static final String HOST_PORTS = "host_ports"; + + @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET) + public Object execute(HttpServletRequest request, HttpServletResponse response) { + // check user auth + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR); + + String hostPorts = request.getParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + List<HostInfo> hostInfos = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + try { + HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort); + hostInfos.add(hostInfo); + } catch (AnalysisException e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); Review Comment: Immediate return inside a loop will prevent processing further hosts if one fails.Consider aggregating all errors and returning a comprehensive response. ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { Review Comment: Using "synchronized" on the method level is a simple locking mechanism but can be a performance bottleneck. Consider using ConcurrentHashMap or other concurrent collections for better scalability. ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); Review Comment: Logging here can help trace the status changes which can be critical for debugging. -> LOG.debug("Setting status for tableId {}: {}", tableId, status); ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get wal queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + done = true; + } + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",wal size:" + size); + empty = false; + } + } + return empty; + } + + public long getAllWalQueueSize(Backend backend) { + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getAllWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting all wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get all wal queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + done = true; + } + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",all wal size:" + size); + } + return size; + } + Review Comment: Consider the use of Optional<TransactionState> to avoid returning null and provide a more functional approach. ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); Review Comment: consider a backoff strategy instead of a fixed sleep. ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java: ########## @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * cal wal size of specific be + * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2... + * return: + * { + * "msg": "OK", + * "code": 0, + * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"], + * "count": 0 + * } + */ + +@RestController +public class CheckWalSizeAction extends RestBaseController { + public static final String HOST_PORTS = "host_ports"; + + @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET) + public Object execute(HttpServletRequest request, HttpServletResponse response) { + // check user auth + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR); + + String hostPorts = request.getParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { Review Comment: Using Guava's Strings utility class for checking null or empty strings is fine, but Spring has built-in mechanisms for request parameter validation that could be used here. ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get wal queue size fail,backend id: " + backend.getId() + ", status: " Review Comment: This could be logged at a different level than warn depending on the severity of this situation. LOG.warn("Failed to get WAL queue size, backend id: {}, status: {}", backend.getId(), response.getStatus()); Consider using exponential backoff or a configurable delay before retrying. ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java: ########## @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * cal wal size of specific be + * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2... + * return: + * { + * "msg": "OK", + * "code": 0, + * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"], + * "count": 0 + * } + */ + +@RestController +public class CheckWalSizeAction extends RestBaseController { + public static final String HOST_PORTS = "host_ports"; + + @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET) + public Object execute(HttpServletRequest request, HttpServletResponse response) { + // check user auth + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR); + + String hostPorts = request.getParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + List<HostInfo> hostInfos = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + try { + HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort); + hostInfos.add(hostInfo); + } catch (AnalysisException e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + + try { + List<Backend> backends = getBackends(hostInfos); + List<String> backendsList = new ArrayList<>(); + for (Backend backend : backends) { Review Comment: The WAL size calculation could potentially be parallelized, if it does not involve shared state or if it's a heavy operation. ########## fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java: ########## @@ -1593,6 +1594,8 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, // set table state olapTable.setState(OlapTableState.SCHEMA_CHANGE); + Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK); + LOG.info("block table:" + tableId); Review Comment: Could be LOG.info("block table: {}", tableId); ########## fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java: ########## @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * cal wal size of specific be + * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2... + * return: + * { + * "msg": "OK", + * "code": 0, + * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"], + * "count": 0 + * } + */ + +@RestController +public class CheckWalSizeAction extends RestBaseController { + public static final String HOST_PORTS = "host_ports"; + + @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET) + public Object execute(HttpServletRequest request, HttpServletResponse response) { + // check user auth + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR); + + String hostPorts = request.getParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + List<HostInfo> hostInfos = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + try { + HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort); + hostInfos.add(hostInfo); + } catch (AnalysisException e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + + try { + List<Backend> backends = getBackends(hostInfos); + List<String> backendsList = new ArrayList<>(); + for (Backend backend : backends) { + long size = Env.getCurrentEnv().getGroupCommitManager() + .getAllWalQueueSize(backend); + backendsList.add(backend.getHost() + ":" + backend.getHeartbeatPort() + ":" + size); + } + return ResponseEntityBuilder.ok(backendsList); + } catch (DdlException e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); Review Comment: Can we think about differentiating between client errors (400s) and server errors (500s)? ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get wal queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + done = true; + } + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",wal size:" + size); + empty = false; + } + } + return empty; + } + + public long getAllWalQueueSize(Backend backend) { + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getAllWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting all wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get all wal queue size fail,backend id: " + backend.getId() + ", status: " Review Comment: LOG.warn("get all wal queue size failed, backend id: {}, status: {}", backend.getId(), response.getStatus()); ########## fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java: ########## @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private Map<Long, SchemaChangeStatus> statusMap = new HashMap<>(); + + public synchronized boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public synchronized void setStatus(long tableId, SchemaChangeStatus status) { + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List<Long> aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get wal queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + done = true; + } + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",wal size:" + size); + empty = false; + } + } + return empty; + } + + public long getAllWalQueueSize(Backend backend) { + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .build(); + PGetWalQueueSizeResponse response = null; + long start = System.currentTimeMillis(); + boolean done = false; + long size = 0; + while (!done && System.currentTimeMillis() - start <= Config.check_wal_queue_timeout_threshold) { + try { + Future<PGetWalQueueSizeResponse> future = BackendServiceProxy.getInstance() + .getAllWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting all wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get all wal queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + done = true; + } + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",all wal size:" + size); Review Comment: LOG.info("backend id: {}, all wal size: {}", backend.getId(), size); ########## fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java: ########## @@ -451,4 +453,29 @@ public Future<PGroupCommitInsertResponse> groupCommitInsert(TNetworkAddress addr throw new RpcException(address.hostname, e.getMessage()); } } + + public Future<PGetWalQueueSizeResponse> getWalQueueSize(TNetworkAddress address, + PGetWalQueueSizeRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.getWalQueueSize(request); + } catch (Throwable e) { + LOG.warn("failed to get wal queue size from address={}:{}", address.getHostname(), + address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); Review Comment: consider including the original exception as the cause. ########## fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java: ########## @@ -158,6 +158,9 @@ private boolean isMatch(Backend backend) { } public List<Backend> getCandidateBackends(ImmutableCollection<Backend> backends) { + for (Backend backend : backends) { + LOG.info("candidate backends:" + backend.getId()); Review Comment: LOG.info("candidate backends: {}", backend.getId()); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org