This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new edb8931f538 [Test](Export) add some debug logs for export (#47400) edb8931f538 is described below commit edb8931f538b1e6050222bbe591693fe92befda9 Author: Tiewei Fang <fangtie...@selectdb.com> AuthorDate: Wed Feb 5 18:46:34 2025 +0800 [Test](Export) add some debug logs for export (#47400) add some debug logs for export --- .../org/apache/doris/load/ExportTaskExecutor.java | 34 ++++++++++++++++++++++ regression-test/pipeline/p0/conf/fe.conf | 2 +- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index 94f432f1c16..b4923337402 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -41,6 +41,8 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; @@ -50,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public class ExportTaskExecutor implements TransientTaskExecutor { + private static final Logger LOG = LogManager.getLogger(ExportTaskExecutor.class); List<StatementBase> selectStmtLists; @@ -78,22 +81,32 @@ public class ExportTaskExecutor implements TransientTaskExecutor { @Override public void execute() throws JobException { + LOG.debug("[Export Task] taskId: {} starting execution", taskId); if (isCanceled.get()) { + LOG.debug("[Export Task] taskId: {} was already canceled before execution", taskId); throw new JobException("Export executor has been canceled, task id: {}", taskId); } + LOG.debug("[Export Task] taskId: {} updating state to EXPORTING", taskId); exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, null, null); List<OutfileInfo> outfileInfoList = Lists.newArrayList(); for (int idx = 0; idx < selectStmtLists.size(); ++idx) { + LOG.debug("[Export Task] taskId: {} processing statement {}/{}", + taskId, idx + 1, selectStmtLists.size()); if (isCanceled.get()) { + LOG.debug("[Export Task] taskId: {} canceled during execution at statement {}", taskId, idx + 1); throw new JobException("Export executor has been canceled, task id: {}", taskId); } // check the version of tablets, skip if the consistency is in partition level. if (exportJob.getExportTable().isManagedTable() && !exportJob.isPartitionConsistency()) { + LOG.debug("[Export Task] taskId: {} checking tablet versions for statement {}", taskId, idx + 1); try { Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException( exportJob.getTableName().getDb()); OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl()); + LOG.debug("[Export Lock] taskId: {}, table: {} about to acquire readLock", + taskId, table.getName()); table.readLock(); + LOG.debug("[Export Lock] taskId: {}, table: {} acquired readLock", taskId, table.getName()); try { List<Long> tabletIds; LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmtLists.get(idx); @@ -108,6 +121,8 @@ public class ExportTaskExecutor implements TransientTaskExecutor { long nowVersion = partition.getVisibleVersion(); long oldVersion = exportJob.getPartitionToVersion().get(partition.getName()); if (nowVersion != oldVersion) { + LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock" + + "due to version mismatch", taskId, table.getName()); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed"); throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}" @@ -115,11 +130,17 @@ public class ExportTaskExecutor implements TransientTaskExecutor { } } } catch (Exception e) { + LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock" + + "due to exception: {}", taskId, table.getName(), e.getMessage()); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); throw new JobException(e); } finally { + LOG.debug("[Export Lock] taskId: {}, table: {} releasing readLock in finally block", + taskId, table.getName()); table.readUnlock(); + LOG.debug("[Export Lock] taskId: {}, table: {} released readLock successfully", + taskId, table.getName()); } } catch (AnalysisException e) { exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, @@ -129,26 +150,39 @@ public class ExportTaskExecutor implements TransientTaskExecutor { } try (AutoCloseConnectContext r = buildConnectContext()) { + LOG.debug("[Export Task] taskId: {} executing statement {}", taskId, idx + 1); stmtExecutor = new StmtExecutor(r.connectContext, selectStmtLists.get(idx)); stmtExecutor.execute(); if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) { + LOG.debug("[Export Task] taskId: {} failed with MySQL error: {}", taskId, + r.connectContext.getState().getErrorMessage()); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage()); return; } + LOG.debug("[Export Task] taskId: {} statement {} executed successfully", taskId, idx + 1); OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); + LOG.debug("[Export Task] taskId: {} got outfile info for statement {}:" + + "fileNumber={}, totalRows={}, fileSize={}", + taskId, idx + 1, outfileInfo.getFileNumber(), + outfileInfo.getTotalRows(), outfileInfo.getFileSize()); outfileInfoList.add(outfileInfo); } catch (Exception e) { + LOG.debug("[Export Task] taskId: {} failed with exception during statement {}: {}", + taskId, idx + 1, e.getMessage(), e); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); throw new JobException(e); } } if (isCanceled.get()) { + LOG.debug("[Export Task] taskId: {} canceled after processing all statements", taskId); throw new JobException("Export executor has been canceled, task id: {}", taskId); } + LOG.debug("[Export Task] taskId: {} completed successfully, updating state to FINISHED", taskId); exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, outfileInfoList, null, null); isFinished.getAndSet(true); + LOG.debug("[Export Task] taskId: {} execution completed", taskId); } @Override diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 2f493ff1098..8c309a7d2c4 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -34,7 +34,7 @@ JAVA_OPTS_FOR_JDK_17="-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m sys_log_level = INFO sys_log_mode = NORMAL -sys_log_verbose_modules = org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl +sys_log_verbose_modules = org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl,org.apache.doris.load.ExportTaskExecutor arrow_flight_sql_port = 8081 catalog_trash_expire_second=1 #enable ssl for test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org