This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 02e69d14c3b [refactor](coordinator) remove non pipeline code from coordinator (#35888) 02e69d14c3b is described below commit 02e69d14c3b1e86ae3556033ef98c19214a3799e Author: yiguolei <676222...@qq.com> AuthorDate: Wed Jun 5 16:02:36 2024 +0800 [refactor](coordinator) remove non pipeline code from coordinator (#35888) Co-authored-by: yiguolei <yiguo...@gmail.com> --- .../main/java/org/apache/doris/qe/Coordinator.java | 903 +++------------------ 1 file changed, 101 insertions(+), 802 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 94e7d59625a..1f14b6f3b8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -33,7 +33,6 @@ import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.FileQueryScanNode; @@ -88,7 +87,6 @@ import org.apache.doris.thrift.TDescriptorTable; import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; @@ -213,15 +211,9 @@ public class Coordinator implements CoordInterface { private final List<PlanFragment> fragments; - private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap(); private Map<Long, PipelineExecContexts> beToPipelineExecCtxs = Maps.newHashMap(); - // backend execute state - private final List<BackendExecState> backendExecStates = Lists.newArrayList(); private final Map<Pair<Integer, Long>, PipelineExecContext> pipelineExecContexts = new HashMap<>(); - // backend which state need to be checked when joining this coordinator. - // It is supposed to be the subset of backendExecStates. - private final List<BackendExecState> needCheckBackendExecStates = Lists.newArrayList(); private final List<PipelineExecContext> needCheckPipelineExecContexts = Lists.newArrayList(); private ResultReceiver receiver; protected final List<ScanNode> scanNodes; @@ -501,14 +493,12 @@ public class Coordinator implements CoordInterface { public void clearExportStatus() { lock.lock(); try { - this.backendExecStates.clear(); this.pipelineExecContexts.clear(); this.queryStatus.updateStatus(TStatusCode.OK, ""); if (this.exportFiles == null) { this.exportFiles = Lists.newArrayList(); } this.exportFiles.clear(); - this.needCheckBackendExecStates.clear(); this.needCheckPipelineExecContexts.clear(); } finally { lock.unlock(); @@ -525,16 +515,9 @@ public class Coordinator implements CoordInterface { public Map<String, Integer> getBeToInstancesNum() { Map<String, Integer> result = Maps.newTreeMap(); - if (enablePipelineEngine) { - for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { - result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + ctxs.brpcAddr.port), - ctxs.getInstanceNumber()); - } - } else { - for (BackendExecStates states : beToExecStates.values()) { - result.put(states.brpcAddr.hostname.concat(":").concat("" + states.brpcAddr.port), - states.states.size()); - } + for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { + result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + ctxs.brpcAddr.port), + ctxs.getInstanceNumber()); } return result; } @@ -736,163 +719,7 @@ public class Coordinator implements CoordInterface { } updateProfileIfPresent(profile -> profile.setAssignFragmentTime()); - if (enablePipelineEngine) { - sendPipelineCtx(); - } else { - sendFragment(); - } - } - - /** - * The logic for sending query plan fragments is as follows: - * First, plan fragments are dependent. According to the order in "fragments" list, - * it must be ensured that on the BE side, the next fragment instance can be executed - * only after the previous fragment instance is ready, - * <p> - * In the previous logic, we will send fragment instances in sequence through RPC, - * and will wait for the RPC of the previous fragment instance to return successfully - * before sending the next one. But for some complex queries, this may lead to too many RPCs. - * <p> - * The optimized logic is as follows: - * 1. If the number of fragment instance is <= 2, the original logic is still used - * to complete the sending of fragments through at most 2 RPCs. - * 2. If the number of fragment instance is >= 3, first group all fragments by BE, - * and send all fragment instances to the corresponding BE node through the FIRST rpc, - * but these fragment instances will only perform the preparation phase but will not be actually executed. - * After that, the execution logic of all fragment instances is started through the SECOND RPC. - * <p> - * After optimization, a query on a BE node will only send two RPCs at most. - * Thereby reducing the "send fragment timeout" error caused by too many RPCs and BE unable to process in time. - * - * @throws TException - * @throws RpcException - * @throws UserException - */ - private void sendFragment() throws TException, RpcException, UserException { - lock(); - try { - Multiset<TNetworkAddress> hostCounter = HashMultiset.create(); - for (FragmentExecParams params : fragmentExecParamsMap.values()) { - for (FInstanceExecParam fi : params.instanceExecParams) { - hostCounter.add(fi.host); - } - } - - int backendIdx = 0; - int profileFragmentId = 0; - long memoryLimit = queryOptions.getMemLimit(); - Map<Long, Integer> numSinkOnBackend = Maps.newHashMap(); - beToExecStates.clear(); - // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, - // else use exec_plan_fragments directly. - // we choose #fragments >=2 because in some cases - // we need ensure that A fragment is already prepared to receive data before B fragment sends data. - // For example: select * from numbers("number"="10") will generate ExchangeNode and - // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does - // not send data until ExchangeNode is ready to receive. - boolean twoPhaseExecution = fragments.size() >= 2; - for (PlanFragment fragment : fragments) { - FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); - - // 1. set up exec states - int instanceNum = params.instanceExecParams.size(); - Preconditions.checkState(instanceNum > 0); - List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx); - - // 2. update memory limit for colocate join - if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) { - int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum); - long newMemory = memoryLimit / rate; - // TODO(zxy): The meaning of mem limit in query_options has become the real once query mem limit. - // The logic to modify mem_limit here needs to be modified or deleted. - for (TExecPlanFragmentParams tParam : tParams) { - tParam.query_options.setMemLimit(newMemory); - } - } - - boolean needCheckBackendState = false; - if (queryOptions.getQueryType() == TQueryType.LOAD && profileFragmentId == 0) { - // this is a load process, and it is the first fragment. - // we should add all BackendExecState of this fragment to needCheckBackendExecStates, - // so that we can check these backends' state when joining this Coordinator - needCheckBackendState = true; - } - - // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. - int instanceId = 0; - for (TExecPlanFragmentParams tParam : tParams) { - BackendExecState execState = - new BackendExecState(fragment.getFragmentId(), instanceId++, - tParam, this.addressToBackendID, executionProfile); - // Each tParam will set the total number of Fragments that need to be executed on the same BE, - // and the BE will determine whether all Fragments have been executed based on this information. - // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. - tParam.setFragmentNumOnHost(hostCounter.count(execState.address)); - tParam.setBackendId(execState.backend.getId()); - tParam.setNeedWaitExecutionTrigger(twoPhaseExecution); - - backendExecStates.add(execState); - if (needCheckBackendState) { - needCheckBackendExecStates.add(execState); - if (LOG.isDebugEnabled()) { - LOG.debug("add need check backend {} for fragment, {} job: {}", - execState.backend.getId(), fragment.getFragmentId().asInt(), jobId); - } - } - - BackendExecStates states = beToExecStates.get(execState.backend.getId()); - if (states == null) { - states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress, - twoPhaseExecution, execState.backend.getProcessEpoch()); - beToExecStates.putIfAbsent(execState.backend.getId(), states); - } - states.addState(execState); - if (tParam.getFragment().getOutputSink() != null - && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { - numSinkOnBackend.merge(execState.backend.getId(), 1, Integer::sum); - } - ++backendIdx; - } - int loadStreamPerNode = 1; - if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { - loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode(); - } - for (TExecPlanFragmentParams tParam : tParams) { - if (tParam.getFragment().getOutputSink() != null - && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { - tParam.setLoadStreamPerNode(loadStreamPerNode); - tParam.setTotalLoadStreams(numSinkOnBackend.size() * loadStreamPerNode); - tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId())); - LOG.info("num local sink for backend {} is {}", tParam.getBackendId(), - numSinkOnBackend.get(tParam.getBackendId())); - } - } - profileFragmentId += 1; - } // end for fragments - - // 4. send and wait fragments rpc - List<Triple<BackendExecStates, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>> - futures = Lists.newArrayList(); - - for (BackendExecStates states : beToExecStates.values()) { - states.unsetFields(); - BackendServiceProxy proxy = BackendServiceProxy.getInstance(); - futures.add(ImmutableTriple.of(states, proxy, states.execRemoteFragmentsAsync(proxy))); - } - waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); - - if (twoPhaseExecution) { - // 5. send and wait execution start rpc - futures.clear(); - for (BackendExecStates states : beToExecStates.values()) { - BackendServiceProxy proxy = BackendServiceProxy.getInstance(); - futures.add(ImmutableTriple.of(states, proxy, states.execPlanFragmentStartAsync(proxy))); - } - waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); - } - } finally { - unlock(); - } + sendPipelineCtx(); } private void sendPipelineCtx() throws TException, RpcException, UserException { @@ -1059,83 +886,6 @@ public class Coordinator implements CoordInterface { } } - private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, Future<PExecPlanFragmentResult>>> futures, - long leftTimeMs, - String operation) throws RpcException, UserException { - if (leftTimeMs <= 0) { - long currentTimeMillis = System.currentTimeMillis(); - long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout(); - String msg = String.format( - "timeout before waiting %s rpc, query timeout:%d, already elapsed:%d, left for this:%d", - operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs); - LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg); - if (!queryOptions.isSetExecutionTimeout() || !queryOptions.isSetQueryTimeout()) { - LOG.warn("Query {} does not set timeout info, execution timeout: is_set:{}, value:{}" - + ", query timeout: is_set:{}, value: {}, " - + "coordinator timeout deadline {}, cur time millis: {}", - DebugUtil.printId(queryId), - queryOptions.isSetExecutionTimeout(), queryOptions.getExecutionTimeout(), - queryOptions.isSetQueryTimeout(), queryOptions.getQueryTimeout(), - timeoutDeadline, currentTimeMillis); - } - throw new UserException(msg); - } - - long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); - for (Triple<BackendExecStates, BackendServiceProxy, Future<PExecPlanFragmentResult>> triple : futures) { - TStatusCode code; - String errMsg = null; - Exception exception = null; - - try { - PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS); - code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - if (!result.getStatus().getErrorMsgsList().isEmpty()) { - errMsg = result.getStatus().getErrorMsgsList().get(0); - } else { - errMsg = operation + " failed. backend id: " + triple.getLeft().beId; - } - } - } catch (ExecutionException e) { - exception = e; - code = TStatusCode.THRIFT_RPC_ERROR; - triple.getMiddle().removeProxy(triple.getLeft().brpcAddr); - } catch (InterruptedException e) { - exception = e; - code = TStatusCode.INTERNAL_ERROR; - } catch (TimeoutException e) { - exception = e; - errMsg = String.format( - "timeout when waiting for %s rpc, query timeout:%d, left timeout for this operation:%d", - operation, queryOptions.getExecutionTimeout(), timeoutMs / 1000); - LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg); - code = TStatusCode.TIMEOUT; - } - - if (code != TStatusCode.OK) { - if (exception != null && errMsg == null) { - errMsg = operation + " failed. " + exception.getMessage(); - } - queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg); - cancelInternal(queryStatus); - switch (code) { - case TIMEOUT: - MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname) - .increase(1L); - throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception); - case THRIFT_RPC_ERROR: - MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname) - .increase(1L); - SimpleScheduler.addToBlacklist(triple.getLeft().beId, errMsg); - throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception); - default: - throw new UserException(errMsg, exception); - } - } - } - } - private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServiceProxy, Future<PExecPlanFragmentResult>>> futures, long leftTimeMs, String operation) throws RpcException, UserException { @@ -1413,56 +1163,30 @@ public class Coordinator implements CoordInterface { try { lock(); - - if (queryOptions.isEnablePipelineEngine()) { - for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) { - Backend be = curBeMap.get(pipelineExecContext.backend.getId()); - if (be == null || !be.isAlive()) { - Status errorStatus = new Status(TStatusCode.CANCELLED, - "Backend {} not exists or dead, query {} should be cancelled", - pipelineExecContext.backend.toString(), DebugUtil.printId(queryId)); - LOG.warn(errorStatus.getErrorMsg()); - return errorStatus; - } - - // Backend process epoch changed, indicates that this be restarts, query should be cancelled. - // Check zero since during upgrading, older version oplog will not persistent be start time - // so newer version follower will get zero epoch when replaying oplog or snapshot - if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { - Status errorStatus = new Status(TStatusCode.CANCELLED, - "Backend process epoch changed, previous {} now {}, " - + "means this be has already restarted, should cancel this coordinator," - + "query id {}", pipelineExecContext.beProcessEpoch, be.getProcessEpoch(), - DebugUtil.printId(queryId)); - LOG.warn(errorStatus.getErrorMsg()); - return errorStatus; - } else if (be.getProcessEpoch() == 0) { - LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", - be.toString()); - } - } - } else { - // beToExecStates will be updated only in non-pipeline query. - for (BackendExecStates beExecState : beToExecStates.values()) { - Backend be = curBeMap.get(beExecState.beId); - if (be == null || !be.isAlive()) { - Status errorStatus = new Status(TStatusCode.CANCELLED, - "Backend {} not exists or dead, query {} should be cancelled.", - beExecState.beId, DebugUtil.printId(queryId)); - LOG.warn(errorStatus.getErrorMsg()); - return errorStatus; - } - - if (beExecState.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { - Status errorStatus = new Status(TStatusCode.CANCELLED, - "Process epoch changed, previous {} now {}, means this be has already restarted," - + "should cancel this coordinator, query id {}", - beExecState.beProcessEpoch, be.getProcessEpoch(), DebugUtil.printId(queryId)); - LOG.warn(errorStatus.getErrorMsg()); - return errorStatus; - } else if (be.getProcessEpoch() == 0) { - LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", be.toString()); - } + for (PipelineExecContext pipelineExecContext : pipelineExecContexts.values()) { + Backend be = curBeMap.get(pipelineExecContext.backend.getId()); + if (be == null || !be.isAlive()) { + Status errorStatus = new Status(TStatusCode.CANCELLED, + "Backend {} not exists or dead, query {} should be cancelled", + pipelineExecContext.backend.toString(), DebugUtil.printId(queryId)); + LOG.warn(errorStatus.getErrorMsg()); + return errorStatus; + } + + // Backend process epoch changed, indicates that this be restarts, query should be cancelled. + // Check zero since during upgrading, older version oplog will not persistent be start time + // so newer version follower will get zero epoch when replaying oplog or snapshot + if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() && be.getProcessEpoch() != 0) { + Status errorStatus = new Status(TStatusCode.CANCELLED, + "Backend process epoch changed, previous {} now {}, " + + "means this be has already restarted, should cancel this coordinator," + + "query id {}", pipelineExecContext.beProcessEpoch, be.getProcessEpoch(), + DebugUtil.printId(queryId)); + LOG.warn(errorStatus.getErrorMsg()); + return errorStatus; + } else if (be.getProcessEpoch() == 0) { + LOG.warn("Backend {} has zero process epoch, maybe we are upgrading cluster?", + be.toString()); } } @@ -1538,14 +1262,8 @@ public class Coordinator implements CoordInterface { } private void cancelRemoteFragmentsAsync(Status cancelReason) { - if (enablePipelineEngine) { - for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) { - ctx.cancelQuery(cancelReason); - } - } else { - for (BackendExecStates backendExecState : beToExecStates.values()) { - backendExecState.cancelQuery(cancelReason); - } + for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) { + ctx.cancelQuery(cancelReason); } } @@ -2546,201 +2264,63 @@ public class Coordinator implements CoordInterface { // update job progress from BE public void updateFragmentExecStatus(TReportExecStatusParams params) { - if (enablePipelineXEngine) { - PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); - if (ctx == null || !ctx.updatePipelineStatus(params)) { - return; - } - - Status status = new Status(params.status); - // for now, abort the query if we see any error except if the error is cancelled - // and returned_all_results_ is true. - // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) - if (!status.ok()) { - if (returnedAllResults && status.isCancelled()) { - LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" - + " is reporting failed status {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), - status.toString()); - } else { - LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," - + " error message: {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), status.toString()); - updateStatus(status); - } - } - if (params.isSetDeltaUrls()) { - updateDeltas(params.getDeltaUrls()); - } - if (params.isSetLoadCounters()) { - updateLoadCounters(params.getLoadCounters()); - } - if (params.isSetTrackingUrl()) { - trackingUrl = params.getTrackingUrl(); - } - if (params.isSetExportFiles()) { - updateExportFiles(params.getExportFiles()); - } - if (params.isSetCommitInfos()) { - updateCommitInfos(params.getCommitInfos()); - } - if (params.isSetErrorTabletInfos()) { - updateErrorTabletInfos(params.getErrorTabletInfos()); - } - if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { - hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); - } - if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { - icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); - } - - if (ctx.done) { - if (LOG.isDebugEnabled()) { - LOG.debug("Query {} fragment {} is marked done", - DebugUtil.printId(queryId), ctx.fragmentId); - } - fragmentsDoneLatch.markedCountDown(params.getFragmentId(), params.getBackendId()); - } - } else if (enablePipelineEngine) { - PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); - if (ctx == null || !ctx.updatePipelineStatus(params)) { - return; - } - - Status status = new Status(params.status); - // for now, abort the query if we see any error except if the error is cancelled - // and returned_all_results_ is true. - // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) - if (!status.ok()) { - if (returnedAllResults && status.isCancelled()) { - LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" - + " is reporting failed status {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), - status.toString()); - } else { - LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," - + " error message: {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), status.toString()); - updateStatus(status); - } - } + PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); + if (ctx == null || !ctx.updatePipelineStatus(params)) { + return; + } - // params.isDone() should be promised. - // There are some periodic reports during the load process, - // and the reports from the intermediate process may be concurrent with the last report. - // The last report causes the counter to decrease to zero, - // but it is possible that the report without commit-info triggered the commit operation, - // resulting in the data not being published. - if (ctx.fragmentInstancesMap.get(params.fragment_instance_id) && params.isDone()) { - if (params.isSetDeltaUrls()) { - updateDeltas(params.getDeltaUrls()); - } - if (params.isSetLoadCounters()) { - updateLoadCounters(params.getLoadCounters()); - } - if (params.isSetTrackingUrl()) { - trackingUrl = params.getTrackingUrl(); - } - if (params.isSetExportFiles()) { - updateExportFiles(params.getExportFiles()); - } - if (params.isSetCommitInfos()) { - updateCommitInfos(params.getCommitInfos()); - } - if (params.isSetErrorTabletInfos()) { - updateErrorTabletInfos(params.getErrorTabletInfos()); - } - if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { - hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); - } - if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { - icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Query {} instance {} is marked done", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); - } - instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L); + Status status = new Status(params.status); + // for now, abort the query if we see any error except if the error is cancelled + // and returned_all_results_ is true. + // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) + if (!status.ok()) { + if (returnedAllResults && status.isCancelled()) { + LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" + + " is reporting failed status {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), + status.toString()); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Query {} instance {} is not marked done", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); - } - } - } else { - if (params.backend_num >= backendExecStates.size()) { - LOG.warn("Query {} instance {} unknown backend number: {}, expected less than: {}", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - params.backend_num, backendExecStates.size()); - return; - } - BackendExecState execState = backendExecStates.get(params.backend_num); - if (!execState.updateInstanceStatus(params)) { - // Has to return here, to avoid out of order report messages. For example, - // the first message is done, then we update commit messages, but the new - // message is running, then we will also update commit messages. It will - // lead to data corrupt. - return; - } - - Status status = new Status(params.status); - // for now, abort the query if we see any error except if the error is cancelled - // and returned_all_results_ is true. - // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) - if (!status.ok()) { - if (status.isCancelled() && returnedAllResults) { - LOG.warn("Query {} has returned all results, its instance {} is reporting failed status {}", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - status.toString()); - } else { - LOG.warn("Instance {} of query {} report failed status, error msg: {}", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - status.toString()); - updateStatus(status); - } + LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," + + " error message: {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), status.toString()); + updateStatus(status); } + } + if (params.isSetDeltaUrls()) { + updateDeltas(params.getDeltaUrls()); + } + if (params.isSetLoadCounters()) { + updateLoadCounters(params.getLoadCounters()); + } + if (params.isSetTrackingUrl()) { + trackingUrl = params.getTrackingUrl(); + } + if (params.isSetExportFiles()) { + updateExportFiles(params.getExportFiles()); + } + if (params.isSetCommitInfos()) { + updateCommitInfos(params.getCommitInfos()); + } + if (params.isSetErrorTabletInfos()) { + updateErrorTabletInfos(params.getErrorTabletInfos()); + } + if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { + hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); + } + if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { + icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); + } - // params.isDone() should be promised. - // There are some periodic reports during the load process, - // and the reports from the intermediate process may be concurrent with the last report. - // The last report causes the counter to decrease to zero, - // but it is possible that the report without commit-info triggered the commit operation, - // resulting in the data not being published. - if (execState.done && params.isDone()) { - if (params.isSetDeltaUrls()) { - updateDeltas(params.getDeltaUrls()); - } - if (params.isSetLoadCounters()) { - updateLoadCounters(params.getLoadCounters()); - } - if (params.isSetTrackingUrl()) { - trackingUrl = params.getTrackingUrl(); - } - if (params.isSetExportFiles()) { - updateExportFiles(params.getExportFiles()); - } - if (params.isSetCommitInfos()) { - updateCommitInfos(params.getCommitInfos()); - } - if (params.isSetErrorTabletInfos()) { - updateErrorTabletInfos(params.getErrorTabletInfos()); - } - if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { - hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); - } - if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { - icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); - } - instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L); + if (ctx.done) { + if (LOG.isDebugEnabled()) { + LOG.debug("Query {} fragment {} is marked done", + DebugUtil.printId(queryId), ctx.fragmentId); } + fragmentsDoneLatch.markedCountDown(params.getFragmentId(), params.getBackendId()); } if (params.isSetLoadedRows() && jobId != -1) { @@ -2800,21 +2380,11 @@ public class Coordinator implements CoordInterface { * return true if all of them are OK. Otherwise, return false. */ private boolean checkBackendState() { - if (enablePipelineEngine) { - for (PipelineExecContext ctx : needCheckPipelineExecContexts) { - if (!ctx.isBackendStateHealthy()) { - queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend " - + ctx.backend.getId() + " is down"); - return false; - } - } - } else { - for (BackendExecState backendExecState : needCheckBackendExecStates) { - if (!backendExecState.isBackendStateHealthy()) { - queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend " - + backendExecState.backend.getId() + " is down"); - return false; - } + for (PipelineExecContext ctx : needCheckPipelineExecContexts) { + if (!ctx.isBackendStateHealthy()) { + queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend " + + ctx.backend.getId() + " is down"); + return false; } } return true; @@ -3128,85 +2698,6 @@ public class Coordinator implements CoordInterface { private final BucketShuffleJoinController bucketShuffleJoinController = new BucketShuffleJoinController(fragmentIdToScanNodeIds); - // record backend execute state - // TODO(zhaochun): add profile information and others - public class BackendExecState { - TExecPlanFragmentParams rpcParams; - PlanFragmentId fragmentId; - boolean initiated; - volatile boolean done; - TNetworkAddress brpcAddress; - TNetworkAddress address; - Backend backend; - long lastMissingHeartbeatTime = -1; - TUniqueId instanceId; - - public BackendExecState(PlanFragmentId fragmentId, int instanceId, - TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID, - ExecutionProfile executionProfile) { - this.fragmentId = fragmentId; - this.rpcParams = rpcParams; - this.initiated = false; - this.done = false; - FInstanceExecParam fi = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId); - this.instanceId = fi.instanceId; - this.address = fi.host; - this.backend = idToBackend.get(addressToBackendID.get(address)); - this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); - String profileName = "Instance " + DebugUtil.printId( - fi.instanceId) + " (host=" + this.backend.getHeartbeatAddress() + ")"; - RuntimeProfile instanceProfile = new RuntimeProfile(profileName); - executionProfile.addInstanceProfile(fragmentId, fi.instanceId, instanceProfile); - } - - /** - * Some information common to all Fragments does not need to be sent repeatedly. - * Therefore, when we confirm that a certain BE has accepted the information, - * we will delete the information in the subsequent Fragment to avoid repeated sending. - * This information can be obtained from the cache of BE. - */ - public void unsetFields() { - this.rpcParams.unsetDescTbl(); - this.rpcParams.unsetFileScanParams(); - this.rpcParams.unsetCoord(); - this.rpcParams.unsetQueryGlobals(); - this.rpcParams.unsetResourceInfo(); - this.rpcParams.setIsSimplifiedParam(true); - } - - // update the instance status, if it is already finished, then not update any more. - public synchronized boolean updateInstanceStatus(TReportExecStatusParams params) { - if (this.done) { - // duplicate packet - return false; - } - this.done = params.done; - if (statsErrorEstimator != null) { - statsErrorEstimator.updateExactReturnedRows(params); - } - return true; - } - - public boolean isBackendStateHealthy() { - if (backend.getLastMissingHeartbeatTime() > lastMissingHeartbeatTime && !backend.isAlive()) { - LOG.warn("backend {} is down while joining the coordinator. job id: {}", - backend.getId(), jobId); - return false; - } - return true; - } - - public FragmentInstanceInfo buildFragmentInstanceInfo() { - return new QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId()) - .fragmentId(String.valueOf(fragmentId)).address(this.address).build(); - } - - private TUniqueId fragmentInstanceId() { - return this.rpcParams.params.getFragmentInstanceId(); - } - } - public class PipelineExecContext { TPipelineFragmentParams rpcParams; PlanFragmentId fragmentId; @@ -3272,31 +2763,12 @@ public class Coordinator implements CoordInterface { if (!params.done) { return false; } - if (enablePipelineX) { - if (this.done) { - // duplicate packet - return false; - } - this.done = true; - return true; - } else { - // could not find the related instances, not update and return false, to indicate - // that the caller should not update any more. - if (!fragmentInstancesMap.containsKey(params.fragment_instance_id)) { - return false; - } - Boolean instanceDone = fragmentInstancesMap.get(params.fragment_instance_id); - if (instanceDone) { - // duplicate packet - return false; - } - fragmentInstancesMap.put(params.fragment_instance_id, true); - profileReportProgress++; - if (profileReportProgress == numInstances) { - this.done = true; - } - return true; + if (this.done) { + // duplicate packet + return false; } + this.done = true; + return true; } public boolean isBackendStateHealthy() { @@ -3315,159 +2787,6 @@ public class Coordinator implements CoordInterface { } } - /** - * A set of BackendExecState for same Backend - */ - public class BackendExecStates { - long beId; - TNetworkAddress brpcAddr; - List<BackendExecState> states = Lists.newArrayList(); - boolean twoPhaseExecution = false; - long beProcessEpoch = 0; - boolean hasCancelled = false; - boolean cancelInProcess = false; - - public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution, long beProcessEpoch) { - this.beId = beId; - this.brpcAddr = brpcAddr; - this.twoPhaseExecution = twoPhaseExecution; - this.beProcessEpoch = beProcessEpoch; - } - - public void addState(BackendExecState state) { - this.states.add(state); - } - - /** - * The BackendExecState in states are all send to the same BE. - * So only the first BackendExecState need to carry some common fields, such as DescriptorTbl, - * the other BackendExecState does not need those fields. Unset them to reduce size. - */ - public void unsetFields() { - boolean first = true; - for (BackendExecState state : states) { - if (first) { - first = false; - continue; - } - state.unsetFields(); - } - } - - public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy) - throws TException { - try { - TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); - for (BackendExecState state : states) { - state.initiated = true; - paramsList.addToParamsList(state.rpcParams); - } - return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); - } catch (RpcException e) { - // DO NOT throw exception here, return a complete future with error code, - // so that the following logic will cancel the fragment. - return futureWithException(e); - } - } - - public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(BackendServiceProxy proxy) - throws TException { - try { - PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder(); - PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); - builder.setQueryId(qid); - return proxy.execPlanFragmentStartAsync(brpcAddr, builder.build()); - } catch (RpcException e) { - // DO NOT throw exception here, return a complete future with error code, - // so that the following logic will cancel the fragment. - return futureWithException(e); - } - } - - @NotNull - private Future<PExecPlanFragmentResult> futureWithException(RpcException e) { - return new Future<PExecPlanFragmentResult>() { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - - @Override - public PExecPlanFragmentResult get() { - PExecPlanFragmentResult result = PExecPlanFragmentResult.newBuilder().setStatus( - Types.PStatus.newBuilder().addErrorMsgs(e.getMessage()) - .setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build(); - return result; - } - - @Override - public PExecPlanFragmentResult get(long timeout, TimeUnit unit) { - return get(); - } - }; - } - - // cancel the fragment instance. - // return true if cancel success. Otherwise, return false - public synchronized void cancelQuery(Status cancelReason) { - LOG.warn("cancelRemoteFragments backend: {}, reason: {}", - idToBackend.get(beId), cancelReason.toString()); - try { - if (this.hasCancelled || this.cancelInProcess) { - LOG.info("Fragment instance has already been cancelled {} or under cancel {}." - + " backend: {}, reason: {}", - this.hasCancelled, this.cancelInProcess, - idToBackend.get(beId), cancelReason.toString()); - return; - } - try { - ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult = - BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddr, cancelReason); - Futures.addCallback(cancelResult, new FutureCallback<InternalService.PCancelPlanFragmentResult>() { - public void onSuccess(InternalService.PCancelPlanFragmentResult result) { - cancelInProcess = false; - if (result.hasStatus()) { - Status status = new Status(result.getStatus()); - if (status.getErrorCode() == TStatusCode.OK) { - hasCancelled = true; - } else { - LOG.warn("Failed to cancel query {} backend: {}, reason: {}", - DebugUtil.printId(queryId), idToBackend.get(beId), status.toString()); - } - } - LOG.warn("Failed to cancel query {} backend: {}, without status", - DebugUtil.printId(queryId), idToBackend.get(beId)); - } - - public void onFailure(Throwable t) { - cancelInProcess = false; - LOG.warn("Failed to cancel query {} backend: {}, reason: {}", - DebugUtil.printId(queryId), idToBackend.get(beId), cancelReason.toString(), t); - } - }, backendRpcCallbackExecutor); - cancelInProcess = true; - } catch (RpcException e) { - LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddr.getHostname(), - brpcAddr.getPort()); - SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddr), e.getMessage()); - } - - } catch (Exception e) { - LOG.warn("catch a exception", e); - } - } - } - public class PipelineExecContexts { long beId; TNetworkAddress brpcAddr; @@ -4012,26 +3331,13 @@ public class Coordinator implements CoordInterface { Lists.newArrayList(); lock(); try { - if (enablePipelineEngine) { - for (int index = 0; index < fragments.size(); index++) { - for (PipelineExecContext ctx : pipelineExecContexts.values()) { - if (fragments.get(index).getFragmentId() != ctx.fragmentId) { - continue; - } - final List<QueryStatisticsItem.FragmentInstanceInfo> info = ctx.buildFragmentInstanceInfo(); - result.addAll(info); - } - } - } else { - for (int index = 0; index < fragments.size(); index++) { - for (BackendExecState backendExecState : backendExecStates) { - if (fragments.get(index).getFragmentId() != backendExecState.fragmentId) { - continue; - } - final QueryStatisticsItem.FragmentInstanceInfo info = - backendExecState.buildFragmentInstanceInfo(); - result.add(info); + for (int index = 0; index < fragments.size(); index++) { + for (PipelineExecContext ctx : pipelineExecContexts.values()) { + if (fragments.get(index).getFragmentId() != ctx.fragmentId) { + continue; } + final List<QueryStatisticsItem.FragmentInstanceInfo> info = ctx.buildFragmentInstanceInfo(); + result.addAll(info); } } } finally { @@ -4043,16 +3349,9 @@ public class Coordinator implements CoordInterface { @Override public List<TNetworkAddress> getInvolvedBackends() { List<TNetworkAddress> backendAddresses = Lists.newArrayList(); - if (this.enablePipelineXEngine) { - for (Long backendId : this.beToPipelineExecCtxs.keySet()) { - Backend backend = idToBackend.get(backendId); - backendAddresses.add(new TNetworkAddress(backend.getHost(), backend.getBePort())); - } - } else { - for (Long backendId : this.beToExecStates.keySet()) { - Backend backend = idToBackend.get(backendId); - backendAddresses.add(new TNetworkAddress(backend.getHost(), backend.getBePort())); - } + for (Long backendId : this.beToPipelineExecCtxs.keySet()) { + Backend backend = idToBackend.get(backendId); + backendAddresses.add(new TNetworkAddress(backend.getHost(), backend.getBePort())); } return backendAddresses; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org