This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c34f5045c8835412d80eb4e67d3a79242e0cddc7 Author: yiguolei <yiguo...@gmail.com> AuthorDate: Sat Mar 16 21:37:02 2024 +0800 fix compile --- .../commands/insert/AbstractInsertExecutor.java | 194 --------------------- 1 file changed, 194 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java deleted file mode 100644 index 77eeef22ebf..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ /dev/null @@ -1,194 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.plans.commands.insert; - -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.EnvFactory; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.nereids.NereidsPlanner; -import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; -import org.apache.doris.planner.DataSink; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.Coordinator; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.qe.QeProcessorImpl.QueryInfo; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.task.LoadEtlTask; -import org.apache.doris.thrift.TQueryType; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Optional; - -/** - * Abstract insert executor. - * The derived class should implement the abstract method for certain type of target table - */ -public abstract class AbstractInsertExecutor { - private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class); - protected long jobId; - protected final ConnectContext ctx; - protected final Coordinator coordinator; - protected String labelName; - protected final DatabaseIf database; - protected final TableIf table; - protected final long createTime = System.currentTimeMillis(); - protected long loadedRows = 0; - protected int filteredRows = 0; - - protected String errMsg = ""; - protected Optional<InsertCommandContext> insertCtx; - - /** - * Constructor - */ - public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner, - Optional<InsertCommandContext> insertCtx) { - this.ctx = ctx; - this.coordinator = EnvFactory.getInstance().createCoordinator(ctx, null, planner, ctx.getStatsErrorEstimator()); - this.labelName = labelName; - this.table = table; - this.database = table.getDatabase(); - this.insertCtx = insertCtx; - } - - public Coordinator getCoordinator() { - return coordinator; - } - - public DatabaseIf getDatabase() { - return database; - } - - public TableIf getTable() { - return table; - } - - public String getLabelName() { - return labelName; - } - - /** - * begin transaction if necessary - */ - public abstract void beginTransaction(); - - /** - * finalize sink to complete enough info for sink execution - */ - protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink); - - /** - * Do something before exec - */ - protected abstract void beforeExec(); - - /** - * Do something after exec finished - */ - protected abstract void onComplete() throws UserException; - - /** - * Do something when exec throw exception - */ - protected abstract void onFail(Throwable t); - - /** - * Do something after exec - */ - protected abstract void afterExec(StmtExecutor executor); - - protected final void execImpl(StmtExecutor executor, long jobId) throws Exception { - String queryId = DebugUtil.printId(ctx.queryId()); - coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); - coordinator.setQueryType(TQueryType.LOAD); - executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); - QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); - QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); - coordinator.exec(); - int execTimeout = ctx.getExecTimeout(); - if (LOG.isDebugEnabled()) { - LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout); - } - boolean notTimeout = coordinator.join(execTimeout); - if (!coordinator.isDone()) { - coordinator.cancel(); - if (notTimeout) { - errMsg = coordinator.getExecStatus().getErrorMsg(); - ErrorReport.reportDdlException("there exists unhealthy backend. " - + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } else { - ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); - } - } - if (!coordinator.getExecStatus().ok()) { - errMsg = coordinator.getExecStatus().getErrorMsg(); - LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg); - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } - if (LOG.isDebugEnabled()) { - LOG.debug("insert [{}] with query id {} delta files is {}", - labelName, queryId, coordinator.getDeltaUrls()); - } - if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { - loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); - } - if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { - filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); - } - } - - private boolean checkStrictMode() { - // if in strict mode, insert will fail if there are filtered rows - if (ctx.getSessionVariable().getEnableInsertStrict()) { - if (filteredRows > 0) { - ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT, - "Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl()); - return false; - } - } - return true; - } - - /** - * execute insert txn for insert into select command. - */ - public void executeSingleInsert(StmtExecutor executor, long jobId) { - beforeExec(); - try { - execImpl(executor, jobId); - if (!checkStrictMode()) { - return; - } - onComplete(); - } catch (Throwable t) { - onFail(t); - return; - } finally { - executor.updateProfile(true); - QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId()); - } - afterExec(executor); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org