This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 40ed235dcca [opt](fe) exit FE when transfer to (non)master failed (#34809) 40ed235dcca is described below commit 40ed235dccae5133a0d82aeab02c39f7652addb2 Author: Mingyu Chen <morning...@163.com> AuthorDate: Sun May 19 08:15:26 2024 +0800 [opt](fe) exit FE when transfer to (non)master failed (#34809) * [opt](fe) exit FE when transfert to (non)master failed * [opt](fe) exit FE when transfert to (non)master failed * [opt](fe) exit FE when transfert to (non)master failed --- be/src/vec/exec/scan/vfile_scanner.cpp | 3 +- .../main/java/org/apache/doris/catalog/Env.java | 301 +++++++++++---------- 2 files changed, 160 insertions(+), 144 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index e518e6068c7..421f7ab5b22 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -965,8 +965,7 @@ Status VFileScanner::_get_next_reader() { COUNTER_UPDATE(_empty_file_counter, 1); continue; } else if (!init_status.ok()) { - return Status::InternalError("failed to init reader for file {}, err: {}", range.path, - init_status.to_string()); + return Status::InternalError("failed to init reader, err: {}", init_status.to_string()); } _name_to_col_type.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 26e24c12ba0..77aa7a2f2b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1461,148 +1461,158 @@ public class Env { @SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"}) private void transferToMaster() { - // stop replayer - if (replayer != null) { - replayer.exit(); - try { - replayer.join(); - } catch (InterruptedException e) { - LOG.warn("got exception when stopping the replayer thread", e); + try { + // stop replayer + if (replayer != null) { + replayer.exit(); + try { + replayer.join(); + } catch (InterruptedException e) { + LOG.warn("got exception when stopping the replayer thread", e); + } + replayer = null; } - replayer = null; - } - // set this after replay thread stopped. to avoid replay thread modify them. - isReady.set(false); - canRead.set(false); + // set this after replay thread stopped. to avoid replay thread modify them. + isReady.set(false); + canRead.set(false); - toMasterProgress = "open editlog"; - editLog.open(); + toMasterProgress = "open editlog"; + editLog.open(); - if (Config.edit_log_type.equalsIgnoreCase("bdb")) { - if (!haProtocol.fencing()) { - LOG.error("fencing failed. will exit."); - System.exit(-1); + if (Config.edit_log_type.equalsIgnoreCase("bdb")) { + if (!haProtocol.fencing()) { + LOG.error("fencing failed. will exit."); + System.exit(-1); + } } - } - toMasterProgress = "replay journal"; - long replayStartTime = System.currentTimeMillis(); - // replay journals. -1 means replay all the journals larger than current journal id. - replayJournal(-1); - long replayEndTime = System.currentTimeMillis(); - LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec"); + toMasterProgress = "replay journal"; + long replayStartTime = System.currentTimeMillis(); + // replay journals. -1 means replay all the journals larger than current journal id. + replayJournal(-1); + long replayEndTime = System.currentTimeMillis(); + LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec"); - if (Config.enable_check_compatibility_mode) { - String msg = "check metadata compatibility successfully"; - LOG.info(msg); - System.out.println(msg); - System.exit(0); - } + if (Config.enable_check_compatibility_mode) { + String msg = "check metadata compatibility successfully"; + LOG.info(msg); + System.out.println(msg); + System.exit(0); + } - checkCurrentNodeExist(); + checkCurrentNodeExist(); - checkBeExecVersion(); + checkBeExecVersion(); - toMasterProgress = "roll editlog"; - editLog.rollEditLog(); + toMasterProgress = "roll editlog"; + editLog.rollEditLog(); - if (Config.enable_advance_next_id) { - advanceNextId(); - } + if (Config.enable_advance_next_id) { + advanceNextId(); + } - // Log meta_version - long journalVersion = MetaContext.get().getMetaVersion(); - if (journalVersion < FeConstants.meta_version) { - toMasterProgress = "log meta version"; - editLog.logMetaVersion(FeConstants.meta_version); - MetaContext.get().setMetaVersion(FeConstants.meta_version); - } + // Log meta_version + long journalVersion = MetaContext.get().getMetaVersion(); + if (journalVersion < FeConstants.meta_version) { + toMasterProgress = "log meta version"; + editLog.logMetaVersion(FeConstants.meta_version); + MetaContext.get().setMetaVersion(FeConstants.meta_version); + } - // Log the first frontend - if (isFirstTimeStartUp) { - // if isFirstTimeStartUp is true, frontends must contains this Node. - Frontend self = frontends.get(nodeName); - Preconditions.checkNotNull(self); - // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false - editLog.logAddFirstFrontend(self); + // Log the first frontend + if (isFirstTimeStartUp) { + // if isFirstTimeStartUp is true, frontends must contains this Node. + Frontend self = frontends.get(nodeName); + Preconditions.checkNotNull(self); + // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false + editLog.logAddFirstFrontend(self); - initLowerCaseTableNames(); - // Set initial root password if master FE first time launch. - auth.setInitialRootPassword(Config.initial_root_password); - } else { - if (journalVersion <= FeMetaVersion.VERSION_114) { - // if journal version is less than 114, which means it is upgraded from version before 2.0. - // When upgrading from 1.2 to 2.0, we need to make sure that the parallelism of query remain unchanged - // when switch to pipeline engine, otherwise it may impact the load of entire cluster - // because the default parallelism of pipeline engine is higher than previous version. - // so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num - int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum; - VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, - String.valueOf(newVal)); - - // similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x - // if the default value has been upgraded - double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor(); - VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", - SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, - String.valueOf(newBcFactorVal)); - - // similar reason as above, need to upgrade enable_nereids_planner to true - VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER, - "true"); - } - if (journalVersion <= FeMetaVersion.VERSION_123) { - VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true"); - VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", - SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none"); - if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) { + initLowerCaseTableNames(); + // Set initial root password if master FE first time launch. + auth.setInitialRootPassword(Config.initial_root_password); + } else { + if (journalVersion <= FeMetaVersion.VERSION_114) { + // if journal version is less than 114, which means it is upgraded from version before 2.0. + // When upgrading from 1.2 to 2.0, + // we need to make sure that the parallelism of query remain unchanged + // when switch to pipeline engine, otherwise it may impact the load of entire cluster + // because the default parallelism of pipeline engine is higher than previous version. + // so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num + int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum; + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", + SessionVariable.PARALLEL_PIPELINE_TASK_NUM, + String.valueOf(newVal)); + + // similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x + // if the default value has been upgraded + double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor(); + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", + SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR, + String.valueOf(newBcFactorVal)); + + // similar reason as above, need to upgrade enable_nereids_planner to true + VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER, + "true"); + } + if (journalVersion <= FeMetaVersion.VERSION_123) { + VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, + "true"); VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", - SessionVariable.NEREIDS_TIMEOUT_SECOND, "30"); + SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none"); + if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) { + VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", + SessionVariable.NEREIDS_TIMEOUT_SECOND, "30"); + } } } - } - getPolicyMgr().createDefaultStoragePolicy(); + getPolicyMgr().createDefaultStoragePolicy(); - // MUST set master ip before starting checkpoint thread. - // because checkpoint thread need this info to select non-master FE to push image + // MUST set master ip before starting checkpoint thread. + // because checkpoint thread need this info to select non-master FE to push image - toMasterProgress = "log master info"; - this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(), - Config.http_port, - Config.rpc_port); - editLog.logMasterInfo(masterInfo); - LOG.info("logMasterInfo:{}", masterInfo); + toMasterProgress = "log master info"; + this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(), + Config.http_port, + Config.rpc_port); + editLog.logMasterInfo(masterInfo); + LOG.info("logMasterInfo:{}", masterInfo); - // for master, the 'isReady' is set behind. - // but we are sure that all metadata is replayed if we get here. - // so no need to check 'isReady' flag in this method - postProcessAfterMetadataReplayed(false); + // for master, the 'isReady' is set behind. + // but we are sure that all metadata is replayed if we get here. + // so no need to check 'isReady' flag in this method + postProcessAfterMetadataReplayed(false); - insertOverwriteManager.allTaskFail(); + insertOverwriteManager.allTaskFail(); - toMasterProgress = "start daemon threads"; + toMasterProgress = "start daemon threads"; - // start all daemon threads that only running on MASTER FE - startMasterOnlyDaemonThreads(); - // start other daemon threads that should running on all FE - startNonMasterDaemonThreads(); + // start all daemon threads that only running on MASTER FE + startMasterOnlyDaemonThreads(); + // start other daemon threads that should running on all FE + startNonMasterDaemonThreads(); - MetricRepo.init(); + MetricRepo.init(); - toMasterProgress = "finished"; - canRead.set(true); - isReady.set(true); - checkLowerCaseTableNames(); + toMasterProgress = "finished"; + canRead.set(true); + isReady.set(true); + checkLowerCaseTableNames(); - String msg = "master finished to replay journal, can write now."; - Util.stdoutWithTime(msg); - LOG.info(msg); - // for master, there are some new thread pools need to register metric - ThreadPoolManager.registerAllThreadPoolMetric(); - if (analysisManager != null) { - analysisManager.getStatisticsCache().preHeat(); + String msg = "master finished to replay journal, can write now."; + Util.stdoutWithTime(msg); + LOG.info(msg); + // for master, there are some new thread pools need to register metric + ThreadPoolManager.registerAllThreadPoolMetric(); + if (analysisManager != null) { + analysisManager.getStatisticsCache().preHeat(); + } + } catch (Throwable e) { + // When failed to transfer to master, we need to exit the process. + // Otherwise, the process will be in an unknown state. + LOG.error("failed to transfer to master. progress: {}", toMasterProgress, e); + System.exit(-1); } } @@ -1778,41 +1788,48 @@ public class Env { private void transferToNonMaster(FrontendNodeType newType) { isReady.set(false); - if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) { - Preconditions.checkState(newType == FrontendNodeType.UNKNOWN); - LOG.warn("{} to UNKNOWN, still offer read service", feType.name()); - // not set canRead here, leave canRead as what is was. - // if meta out of date, canRead will be set to false in replayer thread. - metaReplayState.setTransferToUnknown(); - return; - } + try { + if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) { + Preconditions.checkState(newType == FrontendNodeType.UNKNOWN); + LOG.warn("{} to UNKNOWN, still offer read service", feType.name()); + // not set canRead here, leave canRead as what is was. + // if meta out of date, canRead will be set to false in replayer thread. + metaReplayState.setTransferToUnknown(); + return; + } - // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER + // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER - if (replayer == null) { - createReplayer(); - replayer.start(); - } + if (replayer == null) { + createReplayer(); + replayer.start(); + } - // 'isReady' will be set to true in 'setCanRead()' method - if (!postProcessAfterMetadataReplayed(true)) { - // the state has changed, exit early. - return; - } + // 'isReady' will be set to true in 'setCanRead()' method + if (!postProcessAfterMetadataReplayed(true)) { + // the state has changed, exit early. + return; + } - checkLowerCaseTableNames(); + checkLowerCaseTableNames(); - startNonMasterDaemonThreads(); + startNonMasterDaemonThreads(); - MetricRepo.init(); + MetricRepo.init(); - if (analysisManager != null) { - analysisManager.getStatisticsCache().preHeat(); - } + if (analysisManager != null) { + analysisManager.getStatisticsCache().preHeat(); + } - if (followerColumnSender == null) { - followerColumnSender = new FollowerColumnSender(); - followerColumnSender.start(); + if (followerColumnSender == null) { + followerColumnSender = new FollowerColumnSender(); + followerColumnSender.start(); + } + } catch (Throwable e) { + // When failed to transfer to non-master, we need to exit the process. + // Otherwise, the process will be in an unknown state. + LOG.error("failed to transfer to non-master.", e); + System.exit(-1); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org