This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 40ed235dcca [opt](fe) exit FE when transfer to (non)master failed 
(#34809)
40ed235dcca is described below

commit 40ed235dccae5133a0d82aeab02c39f7652addb2
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sun May 19 08:15:26 2024 +0800

    [opt](fe) exit FE when transfer to (non)master failed (#34809)
    
    * [opt](fe) exit FE when transfert to (non)master failed
    
    * [opt](fe) exit FE when transfert to (non)master failed
    
    * [opt](fe) exit FE when transfert to (non)master failed
---
 be/src/vec/exec/scan/vfile_scanner.cpp             |   3 +-
 .../main/java/org/apache/doris/catalog/Env.java    | 301 +++++++++++----------
 2 files changed, 160 insertions(+), 144 deletions(-)

diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index e518e6068c7..421f7ab5b22 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -965,8 +965,7 @@ Status VFileScanner::_get_next_reader() {
             COUNTER_UPDATE(_empty_file_counter, 1);
             continue;
         } else if (!init_status.ok()) {
-            return Status::InternalError("failed to init reader for file {}, 
err: {}", range.path,
-                                         init_status.to_string());
+            return Status::InternalError("failed to init reader, err: {}", 
init_status.to_string());
         }
 
         _name_to_col_type.clear();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 26e24c12ba0..77aa7a2f2b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1461,148 +1461,158 @@ public class Env {
 
     @SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
     private void transferToMaster() {
-        // stop replayer
-        if (replayer != null) {
-            replayer.exit();
-            try {
-                replayer.join();
-            } catch (InterruptedException e) {
-                LOG.warn("got exception when stopping the replayer thread", e);
+        try {
+            // stop replayer
+            if (replayer != null) {
+                replayer.exit();
+                try {
+                    replayer.join();
+                } catch (InterruptedException e) {
+                    LOG.warn("got exception when stopping the replayer 
thread", e);
+                }
+                replayer = null;
             }
-            replayer = null;
-        }
 
-        // set this after replay thread stopped. to avoid replay thread modify 
them.
-        isReady.set(false);
-        canRead.set(false);
+            // set this after replay thread stopped. to avoid replay thread 
modify them.
+            isReady.set(false);
+            canRead.set(false);
 
-        toMasterProgress = "open editlog";
-        editLog.open();
+            toMasterProgress = "open editlog";
+            editLog.open();
 
-        if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
-            if (!haProtocol.fencing()) {
-                LOG.error("fencing failed. will exit.");
-                System.exit(-1);
+            if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
+                if (!haProtocol.fencing()) {
+                    LOG.error("fencing failed. will exit.");
+                    System.exit(-1);
+                }
             }
-        }
 
-        toMasterProgress = "replay journal";
-        long replayStartTime = System.currentTimeMillis();
-        // replay journals. -1 means replay all the journals larger than 
current journal id.
-        replayJournal(-1);
-        long replayEndTime = System.currentTimeMillis();
-        LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " 
msec");
+            toMasterProgress = "replay journal";
+            long replayStartTime = System.currentTimeMillis();
+            // replay journals. -1 means replay all the journals larger than 
current journal id.
+            replayJournal(-1);
+            long replayEndTime = System.currentTimeMillis();
+            LOG.info("finish replay in " + (replayEndTime - replayStartTime) + 
" msec");
 
-        if (Config.enable_check_compatibility_mode) {
-            String msg = "check metadata compatibility successfully";
-            LOG.info(msg);
-            System.out.println(msg);
-            System.exit(0);
-        }
+            if (Config.enable_check_compatibility_mode) {
+                String msg = "check metadata compatibility successfully";
+                LOG.info(msg);
+                System.out.println(msg);
+                System.exit(0);
+            }
 
-        checkCurrentNodeExist();
+            checkCurrentNodeExist();
 
-        checkBeExecVersion();
+            checkBeExecVersion();
 
-        toMasterProgress = "roll editlog";
-        editLog.rollEditLog();
+            toMasterProgress = "roll editlog";
+            editLog.rollEditLog();
 
-        if (Config.enable_advance_next_id) {
-            advanceNextId();
-        }
+            if (Config.enable_advance_next_id) {
+                advanceNextId();
+            }
 
-        // Log meta_version
-        long journalVersion = MetaContext.get().getMetaVersion();
-        if (journalVersion < FeConstants.meta_version) {
-            toMasterProgress = "log meta version";
-            editLog.logMetaVersion(FeConstants.meta_version);
-            MetaContext.get().setMetaVersion(FeConstants.meta_version);
-        }
+            // Log meta_version
+            long journalVersion = MetaContext.get().getMetaVersion();
+            if (journalVersion < FeConstants.meta_version) {
+                toMasterProgress = "log meta version";
+                editLog.logMetaVersion(FeConstants.meta_version);
+                MetaContext.get().setMetaVersion(FeConstants.meta_version);
+            }
 
-        // Log the first frontend
-        if (isFirstTimeStartUp) {
-            // if isFirstTimeStartUp is true, frontends must contains this 
Node.
-            Frontend self = frontends.get(nodeName);
-            Preconditions.checkNotNull(self);
-            // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even 
if canWrite is false
-            editLog.logAddFirstFrontend(self);
+            // Log the first frontend
+            if (isFirstTimeStartUp) {
+                // if isFirstTimeStartUp is true, frontends must contains this 
Node.
+                Frontend self = frontends.get(nodeName);
+                Preconditions.checkNotNull(self);
+                // OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE 
even if canWrite is false
+                editLog.logAddFirstFrontend(self);
 
-            initLowerCaseTableNames();
-            // Set initial root password if master FE first time launch.
-            auth.setInitialRootPassword(Config.initial_root_password);
-        } else {
-            if (journalVersion <= FeMetaVersion.VERSION_114) {
-                // if journal version is less than 114, which means it is 
upgraded from version before 2.0.
-                // When upgrading from 1.2 to 2.0, we need to make sure that 
the parallelism of query remain unchanged
-                // when switch to pipeline engine, otherwise it may impact the 
load of entire cluster
-                // because the default parallelism of pipeline engine is 
higher than previous version.
-                // so set parallel_pipeline_task_num to 
parallel_fragment_exec_instance_num
-                int newVal = 
VariableMgr.newSessionVariable().parallelExecInstanceNum;
-                VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", 
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
-                        String.valueOf(newVal));
-
-                // similar reason as above, need to upgrade broadcast scale 
factor during 1.2 to 2.x
-                // if the default value has been upgraded
-                double newBcFactorVal = 
VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
-                VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
-                        SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
-                        String.valueOf(newBcFactorVal));
-
-                // similar reason as above, need to upgrade 
enable_nereids_planner to true
-                VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", 
SessionVariable.ENABLE_NEREIDS_PLANNER,
-                        "true");
-            }
-            if (journalVersion <= FeMetaVersion.VERSION_123) {
-                VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", 
SessionVariable.ENABLE_NEREIDS_DML, "true");
-                VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
-                        
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
-                if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 
5) {
+                initLowerCaseTableNames();
+                // Set initial root password if master FE first time launch.
+                auth.setInitialRootPassword(Config.initial_root_password);
+            } else {
+                if (journalVersion <= FeMetaVersion.VERSION_114) {
+                    // if journal version is less than 114, which means it is 
upgraded from version before 2.0.
+                    // When upgrading from 1.2 to 2.0,
+                    // we need to make sure that the parallelism of query 
remain unchanged
+                    // when switch to pipeline engine, otherwise it may impact 
the load of entire cluster
+                    // because the default parallelism of pipeline engine is 
higher than previous version.
+                    // so set parallel_pipeline_task_num to 
parallel_fragment_exec_instance_num
+                    int newVal = 
VariableMgr.newSessionVariable().parallelExecInstanceNum;
+                    VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
+                            SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
+                            String.valueOf(newVal));
+
+                    // similar reason as above, need to upgrade broadcast 
scale factor during 1.2 to 2.x
+                    // if the default value has been upgraded
+                    double newBcFactorVal = 
VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
+                    VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
+                            SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
+                            String.valueOf(newBcFactorVal));
+
+                    // similar reason as above, need to upgrade 
enable_nereids_planner to true
+                    VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", 
SessionVariable.ENABLE_NEREIDS_PLANNER,
+                            "true");
+                }
+                if (journalVersion <= FeMetaVersion.VERSION_123) {
+                    VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", 
SessionVariable.ENABLE_NEREIDS_DML,
+                            "true");
                     VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
-                            SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
+                            
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
+                    if (VariableMgr.newSessionVariable().nereidsTimeoutSecond 
== 5) {
+                        VariableMgr.refreshDefaultSessionVariables("2.0 to 
2.1",
+                                SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
+                    }
                 }
             }
-        }
 
-        getPolicyMgr().createDefaultStoragePolicy();
+            getPolicyMgr().createDefaultStoragePolicy();
 
-        // MUST set master ip before starting checkpoint thread.
-        // because checkpoint thread need this info to select non-master FE to 
push image
+            // MUST set master ip before starting checkpoint thread.
+            // because checkpoint thread need this info to select non-master 
FE to push image
 
-        toMasterProgress = "log master info";
-        this.masterInfo = new 
MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
-                Config.http_port,
-                Config.rpc_port);
-        editLog.logMasterInfo(masterInfo);
-        LOG.info("logMasterInfo:{}", masterInfo);
+            toMasterProgress = "log master info";
+            this.masterInfo = new 
MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
+                    Config.http_port,
+                    Config.rpc_port);
+            editLog.logMasterInfo(masterInfo);
+            LOG.info("logMasterInfo:{}", masterInfo);
 
-        // for master, the 'isReady' is set behind.
-        // but we are sure that all metadata is replayed if we get here.
-        // so no need to check 'isReady' flag in this method
-        postProcessAfterMetadataReplayed(false);
+            // for master, the 'isReady' is set behind.
+            // but we are sure that all metadata is replayed if we get here.
+            // so no need to check 'isReady' flag in this method
+            postProcessAfterMetadataReplayed(false);
 
-        insertOverwriteManager.allTaskFail();
+            insertOverwriteManager.allTaskFail();
 
-        toMasterProgress = "start daemon threads";
+            toMasterProgress = "start daemon threads";
 
-        // start all daemon threads that only running on MASTER FE
-        startMasterOnlyDaemonThreads();
-        // start other daemon threads that should running on all FE
-        startNonMasterDaemonThreads();
+            // start all daemon threads that only running on MASTER FE
+            startMasterOnlyDaemonThreads();
+            // start other daemon threads that should running on all FE
+            startNonMasterDaemonThreads();
 
-        MetricRepo.init();
+            MetricRepo.init();
 
-        toMasterProgress = "finished";
-        canRead.set(true);
-        isReady.set(true);
-        checkLowerCaseTableNames();
+            toMasterProgress = "finished";
+            canRead.set(true);
+            isReady.set(true);
+            checkLowerCaseTableNames();
 
-        String msg = "master finished to replay journal, can write now.";
-        Util.stdoutWithTime(msg);
-        LOG.info(msg);
-        // for master, there are some new thread pools need to register metric
-        ThreadPoolManager.registerAllThreadPoolMetric();
-        if (analysisManager != null) {
-            analysisManager.getStatisticsCache().preHeat();
+            String msg = "master finished to replay journal, can write now.";
+            Util.stdoutWithTime(msg);
+            LOG.info(msg);
+            // for master, there are some new thread pools need to register 
metric
+            ThreadPoolManager.registerAllThreadPoolMetric();
+            if (analysisManager != null) {
+                analysisManager.getStatisticsCache().preHeat();
+            }
+        } catch (Throwable e) {
+            // When failed to transfer to master, we need to exit the process.
+            // Otherwise, the process will be in an unknown state.
+            LOG.error("failed to transfer to master. progress: {}", 
toMasterProgress, e);
+            System.exit(-1);
         }
     }
 
@@ -1778,41 +1788,48 @@ public class Env {
     private void transferToNonMaster(FrontendNodeType newType) {
         isReady.set(false);
 
-        if (feType == FrontendNodeType.OBSERVER || feType == 
FrontendNodeType.FOLLOWER) {
-            Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
-            LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
-            // not set canRead here, leave canRead as what is was.
-            // if meta out of date, canRead will be set to false in replayer 
thread.
-            metaReplayState.setTransferToUnknown();
-            return;
-        }
+        try {
+            if (feType == FrontendNodeType.OBSERVER || feType == 
FrontendNodeType.FOLLOWER) {
+                Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
+                LOG.warn("{} to UNKNOWN, still offer read service", 
feType.name());
+                // not set canRead here, leave canRead as what is was.
+                // if meta out of date, canRead will be set to false in 
replayer thread.
+                metaReplayState.setTransferToUnknown();
+                return;
+            }
 
-        // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
+            // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
 
-        if (replayer == null) {
-            createReplayer();
-            replayer.start();
-        }
+            if (replayer == null) {
+                createReplayer();
+                replayer.start();
+            }
 
-        // 'isReady' will be set to true in 'setCanRead()' method
-        if (!postProcessAfterMetadataReplayed(true)) {
-            // the state has changed, exit early.
-            return;
-        }
+            // 'isReady' will be set to true in 'setCanRead()' method
+            if (!postProcessAfterMetadataReplayed(true)) {
+                // the state has changed, exit early.
+                return;
+            }
 
-        checkLowerCaseTableNames();
+            checkLowerCaseTableNames();
 
-        startNonMasterDaemonThreads();
+            startNonMasterDaemonThreads();
 
-        MetricRepo.init();
+            MetricRepo.init();
 
-        if (analysisManager != null) {
-            analysisManager.getStatisticsCache().preHeat();
-        }
+            if (analysisManager != null) {
+                analysisManager.getStatisticsCache().preHeat();
+            }
 
-        if (followerColumnSender == null) {
-            followerColumnSender = new FollowerColumnSender();
-            followerColumnSender.start();
+            if (followerColumnSender == null) {
+                followerColumnSender = new FollowerColumnSender();
+                followerColumnSender.start();
+            }
+        } catch (Throwable e) {
+            // When failed to transfer to non-master, we need to exit the 
process.
+            // Otherwise, the process will be in an unknown state.
+            LOG.error("failed to transfer to non-master.", e);
+            System.exit(-1);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to