This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 02e69d14c3b [refactor](coordinator) remove non pipeline code from 
coordinator (#35888)
02e69d14c3b is described below

commit 02e69d14c3b1e86ae3556033ef98c19214a3799e
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Jun 5 16:02:36 2024 +0800

    [refactor](coordinator) remove non pipeline code from coordinator (#35888)
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 903 +++------------------
 1 file changed, 101 insertions(+), 802 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 94e7d59625a..1f14b6f3b8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -33,7 +33,6 @@ import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListUtil;
-import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.FileQueryScanNode;
@@ -88,7 +87,6 @@ import org.apache.doris.thrift.TDescriptorTable;
 import org.apache.doris.thrift.TErrorTabletInfo;
 import org.apache.doris.thrift.TEsScanRange;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
-import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TExternalScanRange;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
@@ -213,15 +211,9 @@ public class Coordinator implements CoordInterface {
 
     private final List<PlanFragment> fragments;
 
-    private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
     private Map<Long, PipelineExecContexts> beToPipelineExecCtxs = 
Maps.newHashMap();
 
-    // backend execute state
-    private final List<BackendExecState> backendExecStates = 
Lists.newArrayList();
     private final Map<Pair<Integer, Long>, PipelineExecContext> 
pipelineExecContexts = new HashMap<>();
-    // backend which state need to be checked when joining this coordinator.
-    // It is supposed to be the subset of backendExecStates.
-    private final List<BackendExecState> needCheckBackendExecStates = 
Lists.newArrayList();
     private final List<PipelineExecContext> needCheckPipelineExecContexts = 
Lists.newArrayList();
     private ResultReceiver receiver;
     protected final List<ScanNode> scanNodes;
@@ -501,14 +493,12 @@ public class Coordinator implements CoordInterface {
     public void clearExportStatus() {
         lock.lock();
         try {
-            this.backendExecStates.clear();
             this.pipelineExecContexts.clear();
             this.queryStatus.updateStatus(TStatusCode.OK, "");
             if (this.exportFiles == null) {
                 this.exportFiles = Lists.newArrayList();
             }
             this.exportFiles.clear();
-            this.needCheckBackendExecStates.clear();
             this.needCheckPipelineExecContexts.clear();
         } finally {
             lock.unlock();
@@ -525,16 +515,9 @@ public class Coordinator implements CoordInterface {
 
     public Map<String, Integer> getBeToInstancesNum() {
         Map<String, Integer> result = Maps.newTreeMap();
-        if (enablePipelineEngine) {
-            for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
-                result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + 
ctxs.brpcAddr.port),
-                        ctxs.getInstanceNumber());
-            }
-        } else {
-            for (BackendExecStates states : beToExecStates.values()) {
-                result.put(states.brpcAddr.hostname.concat(":").concat("" + 
states.brpcAddr.port),
-                        states.states.size());
-            }
+        for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
+            result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + 
ctxs.brpcAddr.port),
+                    ctxs.getInstanceNumber());
         }
         return result;
     }
@@ -736,163 +719,7 @@ public class Coordinator implements CoordInterface {
         }
 
         updateProfileIfPresent(profile -> profile.setAssignFragmentTime());
-        if (enablePipelineEngine) {
-            sendPipelineCtx();
-        } else {
-            sendFragment();
-        }
-    }
-
-    /**
-     * The logic for sending query plan fragments is as follows:
-     * First, plan fragments are dependent. According to the order in 
"fragments" list,
-     * it must be ensured that on the BE side, the next fragment instance can 
be executed
-     * only after the previous fragment instance is ready,
-     * <p>
-     * In the previous logic, we will send fragment instances in sequence 
through RPC,
-     * and will wait for the RPC of the previous fragment instance to return 
successfully
-     * before sending the next one. But for some complex queries, this may 
lead to too many RPCs.
-     * <p>
-     * The optimized logic is as follows:
-     * 1. If the number of fragment instance is <= 2, the original logic is 
still used
-     * to complete the sending of fragments through at most 2 RPCs.
-     * 2. If the number of fragment instance is >= 3, first group all 
fragments by BE,
-     * and send all fragment instances to the corresponding BE node through 
the FIRST rpc,
-     * but these fragment instances will only perform the preparation phase 
but will not be actually executed.
-     * After that, the execution logic of all fragment instances is started 
through the SECOND RPC.
-     * <p>
-     * After optimization, a query on a BE node will only send two RPCs at 
most.
-     * Thereby reducing the "send fragment timeout" error caused by too many 
RPCs and BE unable to process in time.
-     *
-     * @throws TException
-     * @throws RpcException
-     * @throws UserException
-     */
-    private void sendFragment() throws TException, RpcException, UserException 
{
-        lock();
-        try {
-            Multiset<TNetworkAddress> hostCounter = HashMultiset.create();
-            for (FragmentExecParams params : fragmentExecParamsMap.values()) {
-                for (FInstanceExecParam fi : params.instanceExecParams) {
-                    hostCounter.add(fi.host);
-                }
-            }
-
-            int backendIdx = 0;
-            int profileFragmentId = 0;
-            long memoryLimit = queryOptions.getMemLimit();
-            Map<Long, Integer> numSinkOnBackend = Maps.newHashMap();
-            beToExecStates.clear();
-            // If #fragments >=2, use twoPhaseExecution with 
exec_plan_fragments_prepare and exec_plan_fragments_start,
-            // else use exec_plan_fragments directly.
-            // we choose #fragments >=2 because in some cases
-            // we need ensure that A fragment is already prepared to receive 
data before B fragment sends data.
-            // For example: select * from numbers("number"="10") will generate 
ExchangeNode and
-            // TableValuedFunctionScanNode, we should ensure 
TableValuedFunctionScanNode does
-            // not send data until ExchangeNode is ready to receive.
-            boolean twoPhaseExecution = fragments.size() >= 2;
-            for (PlanFragment fragment : fragments) {
-                FragmentExecParams params = 
fragmentExecParamsMap.get(fragment.getFragmentId());
-
-                // 1. set up exec states
-                int instanceNum = params.instanceExecParams.size();
-                Preconditions.checkState(instanceNum > 0);
-                List<TExecPlanFragmentParams> tParams = 
params.toThrift(backendIdx);
-
-                // 2. update memory limit for colocate join
-                if 
(colocateFragmentIds.contains(fragment.getFragmentId().asInt())) {
-                    int rate = 
Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum);
-                    long newMemory = memoryLimit / rate;
-                    // TODO(zxy): The meaning of mem limit in query_options 
has become the real once query mem limit.
-                    // The logic to modify mem_limit here needs to be modified 
or deleted.
-                    for (TExecPlanFragmentParams tParam : tParams) {
-                        tParam.query_options.setMemLimit(newMemory);
-                    }
-                }
-
-                boolean needCheckBackendState = false;
-                if (queryOptions.getQueryType() == TQueryType.LOAD && 
profileFragmentId == 0) {
-                    // this is a load process, and it is the first fragment.
-                    // we should add all BackendExecState of this fragment to 
needCheckBackendExecStates,
-                    // so that we can check these backends' state when joining 
this Coordinator
-                    needCheckBackendState = true;
-                }
-
-                // 3. group BackendExecState by BE. So that we can use one RPC 
to send all fragment instances of a BE.
-                int instanceId = 0;
-                for (TExecPlanFragmentParams tParam : tParams) {
-                    BackendExecState execState =
-                            new BackendExecState(fragment.getFragmentId(), 
instanceId++,
-                                    tParam, this.addressToBackendID, 
executionProfile);
-                    // Each tParam will set the total number of Fragments that 
need to be executed on the same BE,
-                    // and the BE will determine whether all Fragments have 
been executed based on this information.
-                    // Notice. load fragment has a small probability that 
FragmentNumOnHost is 0, for unknown reasons.
-                    
tParam.setFragmentNumOnHost(hostCounter.count(execState.address));
-                    tParam.setBackendId(execState.backend.getId());
-                    tParam.setNeedWaitExecutionTrigger(twoPhaseExecution);
-
-                    backendExecStates.add(execState);
-                    if (needCheckBackendState) {
-                        needCheckBackendExecStates.add(execState);
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("add need check backend {} for fragment, 
{} job: {}",
-                                    execState.backend.getId(), 
fragment.getFragmentId().asInt(), jobId);
-                        }
-                    }
-
-                    BackendExecStates states = 
beToExecStates.get(execState.backend.getId());
-                    if (states == null) {
-                        states = new 
BackendExecStates(execState.backend.getId(), execState.brpcAddress,
-                                twoPhaseExecution, 
execState.backend.getProcessEpoch());
-                        beToExecStates.putIfAbsent(execState.backend.getId(), 
states);
-                    }
-                    states.addState(execState);
-                    if (tParam.getFragment().getOutputSink() != null
-                            && tParam.getFragment().getOutputSink().getType() 
== TDataSinkType.OLAP_TABLE_SINK) {
-                        numSinkOnBackend.merge(execState.backend.getId(), 1, 
Integer::sum);
-                    }
-                    ++backendIdx;
-                }
-                int loadStreamPerNode = 1;
-                if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
-                    loadStreamPerNode = 
ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
-                }
-                for (TExecPlanFragmentParams tParam : tParams) {
-                    if (tParam.getFragment().getOutputSink() != null
-                            && tParam.getFragment().getOutputSink().getType() 
== TDataSinkType.OLAP_TABLE_SINK) {
-                        tParam.setLoadStreamPerNode(loadStreamPerNode);
-                        tParam.setTotalLoadStreams(numSinkOnBackend.size() * 
loadStreamPerNode);
-                        
tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId()));
-                        LOG.info("num local sink for backend {} is {}", 
tParam.getBackendId(),
-                                numSinkOnBackend.get(tParam.getBackendId()));
-                    }
-                }
-                profileFragmentId += 1;
-            } // end for fragments
-
-            // 4. send and wait fragments rpc
-            List<Triple<BackendExecStates, BackendServiceProxy, 
Future<InternalService.PExecPlanFragmentResult>>>
-                    futures = Lists.newArrayList();
-
-            for (BackendExecStates states : beToExecStates.values()) {
-                states.unsetFields();
-                BackendServiceProxy proxy = BackendServiceProxy.getInstance();
-                futures.add(ImmutableTriple.of(states, proxy, 
states.execRemoteFragmentsAsync(proxy)));
-            }
-            waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send fragments");
-
-            if (twoPhaseExecution) {
-                // 5. send and wait execution start rpc
-                futures.clear();
-                for (BackendExecStates states : beToExecStates.values()) {
-                    BackendServiceProxy proxy = 
BackendServiceProxy.getInstance();
-                    futures.add(ImmutableTriple.of(states, proxy, 
states.execPlanFragmentStartAsync(proxy)));
-                }
-                waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send execution start");
-            }
-        } finally {
-            unlock();
-        }
+        sendPipelineCtx();
     }
 
     private void sendPipelineCtx() throws TException, RpcException, 
UserException {
@@ -1059,83 +886,6 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, 
Future<PExecPlanFragmentResult>>> futures,
-                         long leftTimeMs,
-            String operation) throws RpcException, UserException {
-        if (leftTimeMs <= 0) {
-            long currentTimeMillis = System.currentTimeMillis();
-            long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 + 
queryOptions.getExecutionTimeout();
-            String msg = String.format(
-                    "timeout before waiting %s rpc, query timeout:%d, already 
elapsed:%d, left for this:%d",
-                    operation, queryOptions.getExecutionTimeout(), elapsed, 
leftTimeMs);
-            LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
-            if (!queryOptions.isSetExecutionTimeout() || 
!queryOptions.isSetQueryTimeout()) {
-                LOG.warn("Query {} does not set timeout info, execution 
timeout: is_set:{}, value:{}"
-                                + ", query timeout: is_set:{}, value: {}, "
-                                + "coordinator timeout deadline {}, cur time 
millis: {}",
-                        DebugUtil.printId(queryId),
-                        queryOptions.isSetExecutionTimeout(), 
queryOptions.getExecutionTimeout(),
-                        queryOptions.isSetQueryTimeout(), 
queryOptions.getQueryTimeout(),
-                        timeoutDeadline, currentTimeMillis);
-            }
-            throw new UserException(msg);
-        }
-
-        long timeoutMs = Math.min(leftTimeMs, 
Config.remote_fragment_exec_timeout_ms);
-        for (Triple<BackendExecStates, BackendServiceProxy, 
Future<PExecPlanFragmentResult>> triple : futures) {
-            TStatusCode code;
-            String errMsg = null;
-            Exception exception = null;
-
-            try {
-                PExecPlanFragmentResult result = 
triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
-                code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-                if (code != TStatusCode.OK) {
-                    if (!result.getStatus().getErrorMsgsList().isEmpty()) {
-                        errMsg = result.getStatus().getErrorMsgsList().get(0);
-                    } else {
-                        errMsg = operation + " failed. backend id: " + 
triple.getLeft().beId;
-                    }
-                }
-            } catch (ExecutionException e) {
-                exception = e;
-                code = TStatusCode.THRIFT_RPC_ERROR;
-                triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
-            } catch (InterruptedException e) {
-                exception = e;
-                code = TStatusCode.INTERNAL_ERROR;
-            } catch (TimeoutException e) {
-                exception = e;
-                errMsg = String.format(
-                    "timeout when waiting for %s rpc, query timeout:%d, left 
timeout for this operation:%d",
-                    operation, queryOptions.getExecutionTimeout(), timeoutMs / 
1000);
-                LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
-                code = TStatusCode.TIMEOUT;
-            }
-
-            if (code != TStatusCode.OK) {
-                if (exception != null && errMsg == null) {
-                    errMsg = operation + " failed. " + exception.getMessage();
-                }
-                queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
-                cancelInternal(queryStatus);
-                switch (code) {
-                    case TIMEOUT:
-                        
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
-                                .increase(1L);
-                        throw new 
RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
-                    case THRIFT_RPC_ERROR:
-                        
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
-                                .increase(1L);
-                        SimpleScheduler.addToBlacklist(triple.getLeft().beId, 
errMsg);
-                        throw new 
RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
-                    default:
-                        throw new UserException(errMsg, exception);
-                }
-            }
-        }
-    }
-
     private void waitPipelineRpc(List<Triple<PipelineExecContexts, 
BackendServiceProxy,
             Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
             String operation) throws RpcException, UserException {
@@ -1413,56 +1163,30 @@ public class Coordinator implements CoordInterface {
 
         try {
             lock();
-
-            if (queryOptions.isEnablePipelineEngine()) {
-                for (PipelineExecContext pipelineExecContext : 
pipelineExecContexts.values()) {
-                    Backend be = 
curBeMap.get(pipelineExecContext.backend.getId());
-                    if (be == null || !be.isAlive()) {
-                        Status errorStatus = new Status(TStatusCode.CANCELLED,
-                                "Backend {} not exists or dead, query {} 
should be cancelled",
-                                pipelineExecContext.backend.toString(), 
DebugUtil.printId(queryId));
-                        LOG.warn(errorStatus.getErrorMsg());
-                        return errorStatus;
-                    }
-
-                    // Backend process epoch changed, indicates that this be 
restarts, query should be cancelled.
-                    // Check zero since during upgrading, older version oplog 
will not persistent be start time
-                    // so newer version follower will get zero epoch when 
replaying oplog or snapshot
-                    if (pipelineExecContext.beProcessEpoch != 
be.getProcessEpoch() && be.getProcessEpoch() != 0) {
-                        Status errorStatus = new Status(TStatusCode.CANCELLED,
-                                "Backend process epoch changed, previous {} 
now {}, "
-                                + "means this be has already restarted, should 
cancel this coordinator,"
-                                + "query id {}", 
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
-                                DebugUtil.printId(queryId));
-                        LOG.warn(errorStatus.getErrorMsg());
-                        return errorStatus;
-                    } else if (be.getProcessEpoch() == 0) {
-                        LOG.warn("Backend {} has zero process epoch, maybe we 
are upgrading cluster?",
-                                be.toString());
-                    }
-                }
-            } else {
-                // beToExecStates will be updated only in non-pipeline query.
-                for (BackendExecStates beExecState : beToExecStates.values()) {
-                    Backend be = curBeMap.get(beExecState.beId);
-                    if (be == null || !be.isAlive()) {
-                        Status errorStatus = new Status(TStatusCode.CANCELLED,
-                                "Backend {} not exists or dead, query {} 
should be cancelled.",
-                                beExecState.beId, DebugUtil.printId(queryId));
-                        LOG.warn(errorStatus.getErrorMsg());
-                        return errorStatus;
-                    }
-
-                    if (beExecState.beProcessEpoch != be.getProcessEpoch() && 
be.getProcessEpoch() != 0) {
-                        Status errorStatus = new Status(TStatusCode.CANCELLED,
-                                "Process epoch changed, previous {} now {}, 
means this be has already restarted,"
-                                + "should cancel this coordinator, query id 
{}",
-                                beExecState.beProcessEpoch, 
be.getProcessEpoch(), DebugUtil.printId(queryId));
-                        LOG.warn(errorStatus.getErrorMsg());
-                        return errorStatus;
-                    } else if (be.getProcessEpoch() == 0) {
-                        LOG.warn("Backend {} has zero process epoch, maybe we 
are upgrading cluster?", be.toString());
-                    }
+            for (PipelineExecContext pipelineExecContext : 
pipelineExecContexts.values()) {
+                Backend be = curBeMap.get(pipelineExecContext.backend.getId());
+                if (be == null || !be.isAlive()) {
+                    Status errorStatus = new Status(TStatusCode.CANCELLED,
+                            "Backend {} not exists or dead, query {} should be 
cancelled",
+                            pipelineExecContext.backend.toString(), 
DebugUtil.printId(queryId));
+                    LOG.warn(errorStatus.getErrorMsg());
+                    return errorStatus;
+                }
+
+                // Backend process epoch changed, indicates that this be 
restarts, query should be cancelled.
+                // Check zero since during upgrading, older version oplog will 
not persistent be start time
+                // so newer version follower will get zero epoch when 
replaying oplog or snapshot
+                if (pipelineExecContext.beProcessEpoch != be.getProcessEpoch() 
&& be.getProcessEpoch() != 0) {
+                    Status errorStatus = new Status(TStatusCode.CANCELLED,
+                            "Backend process epoch changed, previous {} now 
{}, "
+                            + "means this be has already restarted, should 
cancel this coordinator,"
+                            + "query id {}", 
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
+                            DebugUtil.printId(queryId));
+                    LOG.warn(errorStatus.getErrorMsg());
+                    return errorStatus;
+                } else if (be.getProcessEpoch() == 0) {
+                    LOG.warn("Backend {} has zero process epoch, maybe we are 
upgrading cluster?",
+                            be.toString());
                 }
             }
 
@@ -1538,14 +1262,8 @@ public class Coordinator implements CoordInterface {
     }
 
     private void cancelRemoteFragmentsAsync(Status cancelReason) {
-        if (enablePipelineEngine) {
-            for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) {
-                ctx.cancelQuery(cancelReason);
-            }
-        } else {
-            for (BackendExecStates backendExecState : beToExecStates.values()) 
{
-                backendExecState.cancelQuery(cancelReason);
-            }
+        for (PipelineExecContexts ctx : beToPipelineExecCtxs.values()) {
+            ctx.cancelQuery(cancelReason);
         }
     }
 
@@ -2546,201 +2264,63 @@ public class Coordinator implements CoordInterface {
 
     // update job progress from BE
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
-        if (enablePipelineXEngine) {
-            PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
-            if (ctx == null || !ctx.updatePipelineStatus(params)) {
-                return;
-            }
-
-            Status status = new Status(params.status);
-            // for now, abort the query if we see any error except if the 
error is cancelled
-            // and returned_all_results_ is true.
-            // (UpdateStatus() initiates cancellation, if it hasn't already 
been initiated)
-            if (!status.ok()) {
-                if (returnedAllResults && status.isCancelled()) {
-                    LOG.warn("Query {} has returned all results, 
fragment_id={} instance_id={}, be={}"
-                            + " is reporting failed status {}",
-                            DebugUtil.printId(queryId), params.getFragmentId(),
-                            DebugUtil.printId(params.getFragmentInstanceId()),
-                            params.getBackendId(),
-                            status.toString());
-                } else {
-                    LOG.warn("one instance report fail, query_id={} 
fragment_id={} instance_id={}, be={},"
-                                    + " error message: {}",
-                            DebugUtil.printId(queryId), params.getFragmentId(),
-                            DebugUtil.printId(params.getFragmentInstanceId()),
-                            params.getBackendId(), status.toString());
-                    updateStatus(status);
-                }
-            }
-            if (params.isSetDeltaUrls()) {
-                updateDeltas(params.getDeltaUrls());
-            }
-            if (params.isSetLoadCounters()) {
-                updateLoadCounters(params.getLoadCounters());
-            }
-            if (params.isSetTrackingUrl()) {
-                trackingUrl = params.getTrackingUrl();
-            }
-            if (params.isSetExportFiles()) {
-                updateExportFiles(params.getExportFiles());
-            }
-            if (params.isSetCommitInfos()) {
-                updateCommitInfos(params.getCommitInfos());
-            }
-            if (params.isSetErrorTabletInfos()) {
-                updateErrorTabletInfos(params.getErrorTabletInfos());
-            }
-            if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc 
!= null) {
-                
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
-            }
-            if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != 
null) {
-                icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
-            }
-
-            if (ctx.done) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Query {} fragment {} is marked done",
-                            DebugUtil.printId(queryId), ctx.fragmentId);
-                }
-                fragmentsDoneLatch.markedCountDown(params.getFragmentId(), 
params.getBackendId());
-            }
-        } else if (enablePipelineEngine) {
-            PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
-            if (ctx == null || !ctx.updatePipelineStatus(params)) {
-                return;
-            }
-
-            Status status = new Status(params.status);
-            // for now, abort the query if we see any error except if the 
error is cancelled
-            // and returned_all_results_ is true.
-            // (UpdateStatus() initiates cancellation, if it hasn't already 
been initiated)
-            if (!status.ok()) {
-                if (returnedAllResults && status.isCancelled()) {
-                    LOG.warn("Query {} has returned all results, 
fragment_id={} instance_id={}, be={}"
-                            + " is reporting failed status {}",
-                            DebugUtil.printId(queryId), params.getFragmentId(),
-                            DebugUtil.printId(params.getFragmentInstanceId()),
-                            params.getBackendId(),
-                            status.toString());
-                } else {
-                    LOG.warn("one instance report fail, query_id={} 
fragment_id={} instance_id={}, be={},"
-                                    + " error message: {}",
-                            DebugUtil.printId(queryId), params.getFragmentId(),
-                            DebugUtil.printId(params.getFragmentInstanceId()),
-                            params.getBackendId(), status.toString());
-                    updateStatus(status);
-                }
-            }
+        PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
+        if (ctx == null || !ctx.updatePipelineStatus(params)) {
+            return;
+        }
 
-            // params.isDone() should be promised.
-            // There are some periodic reports during the load process,
-            // and the reports from the intermediate process may be concurrent 
with the last report.
-            // The last report causes the counter to decrease to zero,
-            // but it is possible that the report without commit-info 
triggered the commit operation,
-            // resulting in the data not being published.
-            if (ctx.fragmentInstancesMap.get(params.fragment_instance_id) && 
params.isDone()) {
-                if (params.isSetDeltaUrls()) {
-                    updateDeltas(params.getDeltaUrls());
-                }
-                if (params.isSetLoadCounters()) {
-                    updateLoadCounters(params.getLoadCounters());
-                }
-                if (params.isSetTrackingUrl()) {
-                    trackingUrl = params.getTrackingUrl();
-                }
-                if (params.isSetExportFiles()) {
-                    updateExportFiles(params.getExportFiles());
-                }
-                if (params.isSetCommitInfos()) {
-                    updateCommitInfos(params.getCommitInfos());
-                }
-                if (params.isSetErrorTabletInfos()) {
-                    updateErrorTabletInfos(params.getErrorTabletInfos());
-                }
-                if (params.isSetHivePartitionUpdates() && 
hivePartitionUpdateFunc != null) {
-                    
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
-                }
-                if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc 
!= null) {
-                    
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Query {} instance {} is marked done",
-                            DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
-                }
-                
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
+        Status status = new Status(params.status);
+        // for now, abort the query if we see any error except if the error is 
cancelled
+        // and returned_all_results_ is true.
+        // (UpdateStatus() initiates cancellation, if it hasn't already been 
initiated)
+        if (!status.ok()) {
+            if (returnedAllResults && status.isCancelled()) {
+                LOG.warn("Query {} has returned all results, fragment_id={} 
instance_id={}, be={}"
+                        + " is reporting failed status {}",
+                        DebugUtil.printId(queryId), params.getFragmentId(),
+                        DebugUtil.printId(params.getFragmentInstanceId()),
+                        params.getBackendId(),
+                        status.toString());
             } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Query {} instance {} is not marked done",
-                            DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
-                }
-            }
-        } else {
-            if (params.backend_num >= backendExecStates.size()) {
-                LOG.warn("Query {} instance {} unknown backend number: {}, 
expected less than: {}",
-                        DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                        params.backend_num, backendExecStates.size());
-                return;
-            }
-            BackendExecState execState = 
backendExecStates.get(params.backend_num);
-            if (!execState.updateInstanceStatus(params)) {
-                // Has to return here, to avoid out of order report messages. 
For example,
-                // the first message is done, then we update commit messages, 
but the new
-                // message is running, then we will also update commit 
messages. It will
-                // lead to data corrupt.
-                return;
-            }
-
-            Status status = new Status(params.status);
-            // for now, abort the query if we see any error except if the 
error is cancelled
-            // and returned_all_results_ is true.
-            // (UpdateStatus() initiates cancellation, if it hasn't already 
been initiated)
-            if (!status.ok()) {
-                if (status.isCancelled() && returnedAllResults) {
-                    LOG.warn("Query {} has returned all results, its instance 
{} is reporting failed status {}",
-                            DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                            status.toString());
-                } else {
-                    LOG.warn("Instance {} of query {} report failed status, 
error msg: {}",
-                            DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                            status.toString());
-                    updateStatus(status);
-                }
+                LOG.warn("one instance report fail, query_id={} fragment_id={} 
instance_id={}, be={},"
+                                + " error message: {}",
+                        DebugUtil.printId(queryId), params.getFragmentId(),
+                        DebugUtil.printId(params.getFragmentInstanceId()),
+                        params.getBackendId(), status.toString());
+                updateStatus(status);
             }
+        }
+        if (params.isSetDeltaUrls()) {
+            updateDeltas(params.getDeltaUrls());
+        }
+        if (params.isSetLoadCounters()) {
+            updateLoadCounters(params.getLoadCounters());
+        }
+        if (params.isSetTrackingUrl()) {
+            trackingUrl = params.getTrackingUrl();
+        }
+        if (params.isSetExportFiles()) {
+            updateExportFiles(params.getExportFiles());
+        }
+        if (params.isSetCommitInfos()) {
+            updateCommitInfos(params.getCommitInfos());
+        }
+        if (params.isSetErrorTabletInfos()) {
+            updateErrorTabletInfos(params.getErrorTabletInfos());
+        }
+        if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != 
null) {
+            hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
+        }
+        if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) 
{
+            icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
+        }
 
-            // params.isDone() should be promised.
-            // There are some periodic reports during the load process,
-            // and the reports from the intermediate process may be concurrent 
with the last report.
-            // The last report causes the counter to decrease to zero,
-            // but it is possible that the report without commit-info 
triggered the commit operation,
-            // resulting in the data not being published.
-            if (execState.done && params.isDone()) {
-                if (params.isSetDeltaUrls()) {
-                    updateDeltas(params.getDeltaUrls());
-                }
-                if (params.isSetLoadCounters()) {
-                    updateLoadCounters(params.getLoadCounters());
-                }
-                if (params.isSetTrackingUrl()) {
-                    trackingUrl = params.getTrackingUrl();
-                }
-                if (params.isSetExportFiles()) {
-                    updateExportFiles(params.getExportFiles());
-                }
-                if (params.isSetCommitInfos()) {
-                    updateCommitInfos(params.getCommitInfos());
-                }
-                if (params.isSetErrorTabletInfos()) {
-                    updateErrorTabletInfos(params.getErrorTabletInfos());
-                }
-                if (params.isSetHivePartitionUpdates() && 
hivePartitionUpdateFunc != null) {
-                    
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
-                }
-                if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc 
!= null) {
-                    
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
-                }
-                
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
+        if (ctx.done) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Query {} fragment {} is marked done",
+                        DebugUtil.printId(queryId), ctx.fragmentId);
             }
+            fragmentsDoneLatch.markedCountDown(params.getFragmentId(), 
params.getBackendId());
         }
 
         if (params.isSetLoadedRows() && jobId != -1) {
@@ -2800,21 +2380,11 @@ public class Coordinator implements CoordInterface {
      * return true if all of them are OK. Otherwise, return false.
      */
     private boolean checkBackendState() {
-        if (enablePipelineEngine) {
-            for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
-                if (!ctx.isBackendStateHealthy()) {
-                    queryStatus = new Status(TStatusCode.INTERNAL_ERROR, 
"backend "
-                            + ctx.backend.getId() + " is down");
-                    return false;
-                }
-            }
-        } else {
-            for (BackendExecState backendExecState : 
needCheckBackendExecStates) {
-                if (!backendExecState.isBackendStateHealthy()) {
-                    queryStatus = new Status(TStatusCode.INTERNAL_ERROR, 
"backend "
-                            + backendExecState.backend.getId() + " is down");
-                    return false;
-                }
+        for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
+            if (!ctx.isBackendStateHealthy()) {
+                queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend "
+                        + ctx.backend.getId() + " is down");
+                return false;
             }
         }
         return true;
@@ -3128,85 +2698,6 @@ public class Coordinator implements CoordInterface {
     private final BucketShuffleJoinController bucketShuffleJoinController
             = new BucketShuffleJoinController(fragmentIdToScanNodeIds);
 
-    // record backend execute state
-    // TODO(zhaochun): add profile information and others
-    public class BackendExecState {
-        TExecPlanFragmentParams rpcParams;
-        PlanFragmentId fragmentId;
-        boolean initiated;
-        volatile boolean done;
-        TNetworkAddress brpcAddress;
-        TNetworkAddress address;
-        Backend backend;
-        long lastMissingHeartbeatTime = -1;
-        TUniqueId instanceId;
-
-        public BackendExecState(PlanFragmentId fragmentId, int instanceId,
-                                TExecPlanFragmentParams rpcParams, 
Map<TNetworkAddress, Long> addressToBackendID,
-                                ExecutionProfile executionProfile) {
-            this.fragmentId = fragmentId;
-            this.rpcParams = rpcParams;
-            this.initiated = false;
-            this.done = false;
-            FInstanceExecParam fi = 
fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId);
-            this.instanceId = fi.instanceId;
-            this.address = fi.host;
-            this.backend = idToBackend.get(addressToBackendID.get(address));
-            this.brpcAddress = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
-            this.lastMissingHeartbeatTime = 
backend.getLastMissingHeartbeatTime();
-            String profileName = "Instance " + DebugUtil.printId(
-                    fi.instanceId) + " (host=" + 
this.backend.getHeartbeatAddress() + ")";
-            RuntimeProfile instanceProfile = new RuntimeProfile(profileName);
-            executionProfile.addInstanceProfile(fragmentId, fi.instanceId, 
instanceProfile);
-        }
-
-        /**
-         * Some information common to all Fragments does not need to be sent 
repeatedly.
-         * Therefore, when we confirm that a certain BE has accepted the 
information,
-         * we will delete the information in the subsequent Fragment to avoid 
repeated sending.
-         * This information can be obtained from the cache of BE.
-         */
-        public void unsetFields() {
-            this.rpcParams.unsetDescTbl();
-            this.rpcParams.unsetFileScanParams();
-            this.rpcParams.unsetCoord();
-            this.rpcParams.unsetQueryGlobals();
-            this.rpcParams.unsetResourceInfo();
-            this.rpcParams.setIsSimplifiedParam(true);
-        }
-
-        // update the instance status, if it is already finished, then not 
update any more.
-        public synchronized boolean 
updateInstanceStatus(TReportExecStatusParams params) {
-            if (this.done) {
-                // duplicate packet
-                return false;
-            }
-            this.done = params.done;
-            if (statsErrorEstimator != null) {
-                statsErrorEstimator.updateExactReturnedRows(params);
-            }
-            return true;
-        }
-
-        public boolean isBackendStateHealthy() {
-            if (backend.getLastMissingHeartbeatTime() > 
lastMissingHeartbeatTime && !backend.isAlive()) {
-                LOG.warn("backend {} is down while joining the coordinator. 
job id: {}",
-                        backend.getId(), jobId);
-                return false;
-            }
-            return true;
-        }
-
-        public FragmentInstanceInfo buildFragmentInstanceInfo() {
-            return new 
QueryStatisticsItem.FragmentInstanceInfo.Builder().instanceId(fragmentInstanceId())
-                    
.fragmentId(String.valueOf(fragmentId)).address(this.address).build();
-        }
-
-        private TUniqueId fragmentInstanceId() {
-            return this.rpcParams.params.getFragmentInstanceId();
-        }
-    }
-
     public class PipelineExecContext {
         TPipelineFragmentParams rpcParams;
         PlanFragmentId fragmentId;
@@ -3272,31 +2763,12 @@ public class Coordinator implements CoordInterface {
             if (!params.done) {
                 return false;
             }
-            if (enablePipelineX) {
-                if (this.done) {
-                    // duplicate packet
-                    return false;
-                }
-                this.done = true;
-                return true;
-            } else {
-                // could not find the related instances, not update and return 
false, to indicate
-                // that the caller should not update any more.
-                if 
(!fragmentInstancesMap.containsKey(params.fragment_instance_id)) {
-                    return false;
-                }
-                Boolean instanceDone = 
fragmentInstancesMap.get(params.fragment_instance_id);
-                if (instanceDone) {
-                    // duplicate packet
-                    return false;
-                }
-                fragmentInstancesMap.put(params.fragment_instance_id, true);
-                profileReportProgress++;
-                if (profileReportProgress == numInstances) {
-                    this.done = true;
-                }
-                return true;
+            if (this.done) {
+                // duplicate packet
+                return false;
             }
+            this.done = true;
+            return true;
         }
 
         public boolean isBackendStateHealthy() {
@@ -3315,159 +2787,6 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    /**
-     * A set of BackendExecState for same Backend
-     */
-    public class BackendExecStates {
-        long beId;
-        TNetworkAddress brpcAddr;
-        List<BackendExecState> states = Lists.newArrayList();
-        boolean twoPhaseExecution = false;
-        long beProcessEpoch = 0;
-        boolean hasCancelled = false;
-        boolean cancelInProcess = false;
-
-        public BackendExecStates(long beId, TNetworkAddress brpcAddr, boolean 
twoPhaseExecution, long beProcessEpoch) {
-            this.beId = beId;
-            this.brpcAddr = brpcAddr;
-            this.twoPhaseExecution = twoPhaseExecution;
-            this.beProcessEpoch = beProcessEpoch;
-        }
-
-        public void addState(BackendExecState state) {
-            this.states.add(state);
-        }
-
-        /**
-         * The BackendExecState in states are all send to the same BE.
-         * So only the first BackendExecState need to carry some common 
fields, such as DescriptorTbl,
-         * the other BackendExecState does not need those fields. Unset them 
to reduce size.
-         */
-        public void unsetFields() {
-            boolean first = true;
-            for (BackendExecState state : states) {
-                if (first) {
-                    first = false;
-                    continue;
-                }
-                state.unsetFields();
-            }
-        }
-
-        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentsAsync(BackendServiceProxy proxy)
-                throws TException {
-            try {
-                TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
-                for (BackendExecState state : states) {
-                    state.initiated = true;
-                    paramsList.addToParamsList(state.rpcParams);
-                }
-                return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, 
twoPhaseExecution);
-            } catch (RpcException e) {
-                // DO NOT throw exception here, return a complete future with 
error code,
-                // so that the following logic will cancel the fragment.
-                return futureWithException(e);
-            }
-        }
-
-        public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentStartAsync(BackendServiceProxy proxy)
-                throws TException {
-            try {
-                PExecPlanFragmentStartRequest.Builder builder = 
PExecPlanFragmentStartRequest.newBuilder();
-                PUniqueId qid = 
PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
-                builder.setQueryId(qid);
-                return proxy.execPlanFragmentStartAsync(brpcAddr, 
builder.build());
-            } catch (RpcException e) {
-                // DO NOT throw exception here, return a complete future with 
error code,
-                // so that the following logic will cancel the fragment.
-                return futureWithException(e);
-            }
-        }
-
-        @NotNull
-        private Future<PExecPlanFragmentResult> 
futureWithException(RpcException e) {
-            return new Future<PExecPlanFragmentResult>() {
-                @Override
-                public boolean cancel(boolean mayInterruptIfRunning) {
-                    return false;
-                }
-
-                @Override
-                public boolean isCancelled() {
-                    return false;
-                }
-
-                @Override
-                public boolean isDone() {
-                    return true;
-                }
-
-                @Override
-                public PExecPlanFragmentResult get() {
-                    PExecPlanFragmentResult result = 
PExecPlanFragmentResult.newBuilder().setStatus(
-                            
Types.PStatus.newBuilder().addErrorMsgs(e.getMessage())
-                                    
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build();
-                    return result;
-                }
-
-                @Override
-                public PExecPlanFragmentResult get(long timeout, TimeUnit 
unit) {
-                    return get();
-                }
-            };
-        }
-
-        // cancel the fragment instance.
-        // return true if cancel success. Otherwise, return false
-        public synchronized void cancelQuery(Status cancelReason) {
-            LOG.warn("cancelRemoteFragments  backend: {}, reason: {}",
-                    idToBackend.get(beId), cancelReason.toString());
-            try {
-                if (this.hasCancelled || this.cancelInProcess) {
-                    LOG.info("Fragment instance has already been cancelled {} 
or under cancel {}."
-                                    + " backend: {}, reason: {}",
-                            this.hasCancelled, this.cancelInProcess,
-                            idToBackend.get(beId), cancelReason.toString());
-                    return;
-                }
-                try {
-                    
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
-                            
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddr, 
cancelReason);
-                    Futures.addCallback(cancelResult, new 
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
-                        public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
-                            cancelInProcess = false;
-                            if (result.hasStatus()) {
-                                Status status = new Status(result.getStatus());
-                                if (status.getErrorCode() == TStatusCode.OK) {
-                                    hasCancelled = true;
-                                } else {
-                                    LOG.warn("Failed to cancel query {}  
backend: {}, reason: {}",
-                                            DebugUtil.printId(queryId), 
idToBackend.get(beId), status.toString());
-                                }
-                            }
-                            LOG.warn("Failed to cancel query {} backend: {}, 
without status",
-                                    DebugUtil.printId(queryId), 
idToBackend.get(beId));
-                        }
-
-                        public void onFailure(Throwable t) {
-                            cancelInProcess = false;
-                            LOG.warn("Failed to cancel query {} backend: {}, 
reason: {}",
-                                    DebugUtil.printId(queryId), 
idToBackend.get(beId), cancelReason.toString(), t);
-                        }
-                    }, backendRpcCallbackExecutor);
-                    cancelInProcess = true;
-                } catch (RpcException e) {
-                    LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddr.getHostname(),
-                            brpcAddr.getPort());
-                    
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddr), 
e.getMessage());
-                }
-
-            } catch (Exception e) {
-                LOG.warn("catch a exception", e);
-            }
-        }
-    }
-
     public class PipelineExecContexts {
         long beId;
         TNetworkAddress brpcAddr;
@@ -4012,26 +3331,13 @@ public class Coordinator implements CoordInterface {
                 Lists.newArrayList();
         lock();
         try {
-            if (enablePipelineEngine) {
-                for (int index = 0; index < fragments.size(); index++) {
-                    for (PipelineExecContext ctx : 
pipelineExecContexts.values()) {
-                        if (fragments.get(index).getFragmentId() != 
ctx.fragmentId) {
-                            continue;
-                        }
-                        final List<QueryStatisticsItem.FragmentInstanceInfo> 
info = ctx.buildFragmentInstanceInfo();
-                        result.addAll(info);
-                    }
-                }
-            } else {
-                for (int index = 0; index < fragments.size(); index++) {
-                    for (BackendExecState backendExecState : 
backendExecStates) {
-                        if (fragments.get(index).getFragmentId() != 
backendExecState.fragmentId) {
-                            continue;
-                        }
-                        final QueryStatisticsItem.FragmentInstanceInfo info =
-                                backendExecState.buildFragmentInstanceInfo();
-                        result.add(info);
+            for (int index = 0; index < fragments.size(); index++) {
+                for (PipelineExecContext ctx : pipelineExecContexts.values()) {
+                    if (fragments.get(index).getFragmentId() != 
ctx.fragmentId) {
+                        continue;
                     }
+                    final List<QueryStatisticsItem.FragmentInstanceInfo> info 
= ctx.buildFragmentInstanceInfo();
+                    result.addAll(info);
                 }
             }
         } finally {
@@ -4043,16 +3349,9 @@ public class Coordinator implements CoordInterface {
     @Override
     public List<TNetworkAddress> getInvolvedBackends() {
         List<TNetworkAddress> backendAddresses = Lists.newArrayList();
-        if (this.enablePipelineXEngine) {
-            for (Long backendId : this.beToPipelineExecCtxs.keySet()) {
-                Backend backend = idToBackend.get(backendId);
-                backendAddresses.add(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
-            }
-        } else {
-            for (Long backendId : this.beToExecStates.keySet()) {
-                Backend backend = idToBackend.get(backendId);
-                backendAddresses.add(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
-            }
+        for (Long backendId : this.beToPipelineExecCtxs.keySet()) {
+            Backend backend = idToBackend.get(backendId);
+            backendAddresses.add(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
         }
         return backendAddresses;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to