This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 10abbd2b62 [Feauture](Export) support parallel export job using Job 
Schedule (#22854)
10abbd2b62 is described below

commit 10abbd2b620cf4a39df734d8471f87ea1d58f906
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Fri Aug 18 22:24:42 2023 +0800

    [Feauture](Export) support parallel export job using Job Schedule (#22854)
---
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../apache/doris/analysis/CancelExportStmt.java    |  12 +-
 .../java/org/apache/doris/analysis/ExportStmt.java | 230 ++++---
 .../org/apache/doris/analysis/OutFileClause.java   |   2 +-
 .../org/apache/doris/analysis/ShowExportStmt.java  |   8 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   2 +-
 .../org/apache/doris/common/proc/JobsProcDir.java  |  18 +-
 .../org/apache/doris/journal/JournalEntity.java    |   3 +-
 .../main/java/org/apache/doris/load/ExportJob.java | 702 +++++++++------------
 .../java/org/apache/doris/load/ExportJobState.java |  46 ++
 .../apache/doris/load/ExportJobStateTransfer.java  |  88 +++
 .../main/java/org/apache/doris/load/ExportMgr.java | 110 ++--
 .../org/apache/doris/load/ExportTaskExecutor.java  | 171 +++++
 .../java/org/apache/doris/load/OutfileInfo.java    |  37 ++
 .../java/org/apache/doris/persist/EditLog.java     |   8 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |   6 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   3 +-
 .../org/apache/doris/task/ExportExportingTask.java |  24 +-
 .../doris/analysis/CancelExportStmtTest.java       |  16 +-
 .../org/apache/doris/qe/SessionVariablesTest.java  |  13 +-
 .../suites/export_p0/test_export_basic.groovy      |   3 +-
 .../suites/export_p2/test_export_with_hdfs.groovy  |   8 +-
 .../suites/export_p2/test_export_with_s3.groovy    |   9 +-
 23 files changed, 906 insertions(+), 618 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2819439c15..c2dc625829 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2102,6 +2102,11 @@ public class Config extends ConfigBase {
             "The maximum parallelism allowed by Export job"})
     public static int maximum_parallelism_of_export_job = 50;
 
+    @ConfField(mutable = true, description = {
+            "ExportExecutorTask任务中一个OutFile语句允许的最大tablets数量",
+            "The maximum number of tablets allowed by an OutfileStatement in 
an ExportExecutorTask"})
+    public static int maximum_tablets_of_outfile_in_export = 10;
+
     @ConfField(mutable = true, description = {
             "是否用 mysql 的 bigint 类型来返回 Doris 的 largeint 类型",
             "Whether to use mysql's bigint type to return Doris's largeint 
type"})
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
index ca55761420..cdd2ecf73c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
@@ -21,8 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.load.ExportJob;
-import org.apache.doris.load.ExportJob.JobState;
+import org.apache.doris.load.ExportJobState;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
@@ -83,11 +82,10 @@ public class CancelExportStmt extends DdlStmt {
                 throw new AnalysisException("Only label can use like");
             }
             state = inputValue;
-            ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
-            if (jobState != ExportJob.JobState.PENDING
-                    && jobState != JobState.IN_QUEUE
-                    && jobState != ExportJob.JobState.EXPORTING) {
-                throw new AnalysisException("Only support 
PENDING/IN_QUEUE/EXPORTING, invalid state: " + state);
+            ExportJobState jobState = ExportJobState.valueOf(state);
+            if (jobState != ExportJobState.PENDING
+                    && jobState != ExportJobState.EXPORTING) {
+                throw new AnalysisException("Only support PENDING/EXPORTING, 
invalid state: " + state);
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 930c3121b9..69f4d7174f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -17,13 +17,13 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeNameFormat;
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.URI;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.load.ExportJob;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
@@ -39,25 +40,26 @@ import org.apache.doris.qe.VariableMgr;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import lombok.Getter;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 
 // EXPORT statement, export data to dirs by broker.
 //
 // syntax:
-//      EXPORT TABLE tablename [PARTITION (name1[, ...])]
+//      EXPORT TABLE table_name [PARTITION (name1[, ...])]
 //          TO 'export_target_path'
 //          [PROPERTIES("key"="value")]
 //          BY BROKER 'broker_name' [( $broker_attrs)]
+@Getter
 public class ExportStmt extends StatementBase {
-    private static final Logger LOG = LogManager.getLogger(ExportStmt.class);
     public static final String PARALLELISM = "parallelism";
     public static final String LABEL = "label";
 
@@ -106,6 +108,8 @@ public class ExportStmt extends StatementBase {
 
     private UserIdentity userIdentity;
 
+    private ExportJob exportJob;
+
     public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
                       Map<String, String> properties, BrokerDesc brokerDesc) {
         this.tableRef = tableRef;
@@ -118,75 +122,15 @@ public class ExportStmt extends StatementBase {
         this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
         this.lineDelimiter = DEFAULT_LINE_DELIMITER;
         this.columns = DEFAULT_COLUMNS;
-        if (ConnectContext.get() != null) {
-            this.sessionVariables = ConnectContext.get().getSessionVariable();
-        } else {
-            this.sessionVariables = VariableMgr.getDefaultSessionVariable();
-        }
-    }
-
-    public String getColumns() {
-        return columns;
-    }
-
-    public TableName getTblName() {
-        return tblName;
-    }
-
-    public List<String> getPartitions() {
-        return partitionStringNames;
-    }
-
-    public Expr getWhereExpr() {
-        return whereExpr;
-    }
-
-    public String getPath() {
-        return path;
-    }
-
-    public BrokerDesc getBrokerDesc() {
-        return brokerDesc;
-    }
-
-    public String getColumnSeparator() {
-        return this.columnSeparator;
-    }
-
-    public String getLineDelimiter() {
-        return this.lineDelimiter;
-    }
-
-    public TableRef getTableRef() {
-        return this.tableRef;
-    }
-
-    public String getFormat() {
-        return format;
-    }
-
-    public String getLabel() {
-        return label;
-    }
-
-    public SessionVariable getSessionVariables() {
-        return sessionVariables;
-    }
 
-    public String getQualifiedUser() {
-        return qualifiedUser;
-    }
-
-    public UserIdentity getUserIdentity() {
-        return this.userIdentity;
+        Optional<SessionVariable> optionalSessionVariable = 
Optional.ofNullable(
+                ConnectContext.get().getSessionVariable());
+        this.sessionVariables = 
optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable());
     }
 
     @Override
     public boolean needAuditEncryption() {
-        if (brokerDesc != null) {
-            return true;
-        }
-        return false;
+        return brokerDesc != null;
     }
 
     @Override
@@ -197,16 +141,17 @@ public class ExportStmt extends StatementBase {
         Preconditions.checkNotNull(tableRef);
         tableRef.analyze(analyzer);
 
-        this.tblName = tableRef.getName();
         // disallow external catalog
+        tblName = tableRef.getName();
         Util.prohibitExternalCatalog(tblName.getCtl(), 
this.getClass().getSimpleName());
 
-        PartitionNames partitionNames = tableRef.getPartitionNames();
-        if (partitionNames != null) {
-            if (partitionNames.isTemp()) {
+        // get partitions name
+        Optional<PartitionNames> optionalPartitionNames = 
Optional.ofNullable(tableRef.getPartitionNames());
+        if (optionalPartitionNames.isPresent()) {
+            if (optionalPartitionNames.get().isTemp()) {
                 throw new AnalysisException("Do not support exporting 
temporary partitions");
             }
-            partitionStringNames = partitionNames.getPartitionNames();
+            partitionStringNames = 
optionalPartitionNames.get().getPartitionNames();
         }
 
         // check auth
@@ -222,7 +167,7 @@ public class ExportStmt extends StatementBase {
         userIdentity = ConnectContext.get().getCurrentUserIdentity();
 
         // check table && partitions whether exist
-        checkTable(analyzer.getEnv());
+        checkPartitions(analyzer.getEnv());
 
         // check broker whether exist
         if (brokerDesc == null) {
@@ -232,28 +177,86 @@ public class ExportStmt extends StatementBase {
         // check path is valid
         path = checkPath(path, brokerDesc.getStorageType());
         if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
-            if 
(!analyzer.getEnv().getBrokerMgr().containsBroker(brokerDesc.getName())) {
+            BrokerMgr brokerMgr = analyzer.getEnv().getBrokerMgr();
+            if (!brokerMgr.containsBroker(brokerDesc.getName())) {
                 throw new AnalysisException("broker " + brokerDesc.getName() + 
" does not exist");
             }
-
-            FsBroker broker = 
analyzer.getEnv().getBrokerMgr().getAnyBroker(brokerDesc.getName());
-            if (broker == null) {
+            if (null == brokerMgr.getAnyBroker(brokerDesc.getName())) {
                 throw new AnalysisException("failed to get alive broker");
             }
         }
 
         // check properties
         checkProperties(properties);
+
+        // create job and analyze job
+        setJob();
+        exportJob.analyze();
     }
 
-    private void checkTable(Env env) throws AnalysisException {
+    private void setJob() throws UserException {
+        exportJob = new ExportJob();
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
+        exportJob.setDbId(db.getId());
+        exportJob.setTableName(this.tblName);
+        
exportJob.setExportTable(db.getTableOrDdlException(this.tblName.getTbl()));
+        
exportJob.setTableId(db.getTableOrDdlException(this.tblName.getTbl()).getId());
+        exportJob.setTableRef(this.tableRef);
+
+        // set partitions
+        exportJob.setPartitionNames(this.partitionStringNames);
+
+        // set where expr
+        exportJob.setWhereExpr(this.whereExpr);
+
+        // set path
+        exportJob.setExportPath(this.path);
+
+        // set properties
+        exportJob.setLabel(this.label);
+        exportJob.setColumnSeparator(this.columnSeparator);
+        exportJob.setLineDelimiter(this.lineDelimiter);
+        exportJob.setFormat(this.format);
+        exportJob.setColumns(this.columns);
+        exportJob.setParallelism(this.parallelism);
+        exportJob.setMaxFileSize(this.maxFileSize);
+        exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
+
+        if (!Strings.isNullOrEmpty(this.columns)) {
+            Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
+            
exportJob.setExportColumns(split.splitToList(this.columns.toLowerCase()));
+        }
+
+        // set broker desc
+        exportJob.setBrokerDesc(this.brokerDesc);
+
+        // set sessions
+        exportJob.setQualifiedUser(this.qualifiedUser);
+        exportJob.setUserIdentity(this.userIdentity);
+        exportJob.setSessionVariables(this.sessionVariables);
+        exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS());
+
+        exportJob.setSql(this.toSql());
+        exportJob.setOrigStmt(this.getOrigStmt());
+    }
+
+    // check partitions specified by user are belonged to the table.
+    private void checkPartitions(Env env) throws AnalysisException {
+        if (partitionStringNames == null) {
+            return;
+        }
+
+        if (partitionStringNames.size() > 
Config.maximum_number_of_export_partitions) {
+            throw new AnalysisException("The partitions number of this export 
job is larger than the maximum number"
+                    + " of partitions allowed by an export job");
+        }
+
         Database db = 
env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
         Table table = db.getTableOrAnalysisException(tblName.getTbl());
         table.readLock();
         try {
-            if (partitionStringNames == null) {
-                return;
-            }
+            // check table
             if (!table.isPartitioned()) {
                 throw new AnalysisException("Table[" + tblName.getTbl() + "] 
is not partitioned.");
             }
@@ -270,13 +273,14 @@ public class ExportStmt extends StatementBase {
                 case VIEW:
                 default:
                     throw new AnalysisException("Table[" + tblName.getTbl() + 
"] is "
-                            + tblType.toString() + " type, do not support 
EXPORT.");
+                            + tblType + " type, do not support EXPORT.");
             }
 
             for (String partitionName : partitionStringNames) {
                 Partition partition = table.getPartition(partitionName);
                 if (partition == null) {
-                    throw new AnalysisException("Partition [" + partitionName 
+ "] does not exist");
+                    throw new AnalysisException("Partition [" + partitionName 
+ "] does not exist "
+                            + "in Table[" + tblName.getTbl() + "]");
                 }
             }
         } finally {
@@ -286,13 +290,17 @@ public class ExportStmt extends StatementBase {
 
     public static String checkPath(String path, StorageBackend.StorageType 
type) throws AnalysisException {
         if (Strings.isNullOrEmpty(path)) {
-            throw new AnalysisException("No dest path specified.");
+            throw new AnalysisException("No destination path specified.");
         }
 
         URI uri = URI.create(path);
         String schema = uri.getScheme();
+        if (schema == null) {
+            throw new AnalysisException(
+                    "Invalid export path, there is no schema of URI found. 
please check your path.");
+        }
         if (type == StorageBackend.StorageType.BROKER) {
-            if (schema == null || (!schema.equalsIgnoreCase("bos")
+            if (!schema.equalsIgnoreCase("bos")
                     && !schema.equalsIgnoreCase("afs")
                     && !schema.equalsIgnoreCase("hdfs")
                     && !schema.equalsIgnoreCase("ofs")
@@ -302,23 +310,17 @@ public class ExportStmt extends StatementBase {
                     && !schema.equalsIgnoreCase("cosn")
                     && !schema.equalsIgnoreCase("gfs")
                     && !schema.equalsIgnoreCase("jfs")
-                    && !schema.equalsIgnoreCase("gs"))) {
+                    && !schema.equalsIgnoreCase("gs")) {
                 throw new AnalysisException("Invalid broker path. please use 
valid 'hdfs://', 'afs://' , 'bos://',"
                         + " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 
'gfs://', 'gs://' or 'jfs://' path.");
             }
-        } else if (type == StorageBackend.StorageType.S3) {
-            if (schema == null || !schema.equalsIgnoreCase("s3")) {
-                throw new AnalysisException("Invalid export path. please use 
valid 's3://' path.");
-            }
-        } else if (type == StorageBackend.StorageType.HDFS) {
-            if (schema == null || !schema.equalsIgnoreCase("hdfs")) {
-                throw new AnalysisException("Invalid export path. please use 
valid 'HDFS://' path.");
-            }
-        } else if (type == StorageBackend.StorageType.LOCAL) {
-            if (schema != null && !schema.equalsIgnoreCase("file")) {
-                throw new AnalysisException(
+        } else if (type == StorageBackend.StorageType.S3 && 
!schema.equalsIgnoreCase("s3")) {
+            throw new AnalysisException("Invalid export path. please use valid 
's3://' path.");
+        } else if (type == StorageBackend.StorageType.HDFS && 
!schema.equalsIgnoreCase("hdfs")) {
+            throw new AnalysisException("Invalid export path. please use valid 
'HDFS://' path.");
+        } else if (type == StorageBackend.StorageType.LOCAL && 
!schema.equalsIgnoreCase("file")) {
+            throw new AnalysisException(
                         "Invalid export path. please use valid '" + 
OutFileClause.LOCAL_FILE_PREFIX + "' path.");
-            }
         }
         return path;
     }
@@ -326,7 +328,7 @@ public class ExportStmt extends StatementBase {
     private void checkProperties(Map<String, String> properties) throws 
UserException {
         for (String key : properties.keySet()) {
             if (!PROPERTIES_SET.contains(key.toLowerCase())) {
-                throw new DdlException("Invalid property key: '" + key + "'");
+                throw new UserException("Invalid property key: [" + key + "]");
             }
         }
 
@@ -348,20 +350,24 @@ public class ExportStmt extends StatementBase {
 
         // parallelism
         String parallelismString = properties.getOrDefault(PARALLELISM, 
DEFAULT_PARALLELISM);
-        parallelism = Integer.parseInt(parallelismString);
+        try {
+            this.parallelism = Integer.parseInt(parallelismString);
+        } catch (NumberFormatException e) {
+            throw new UserException("The value of parallelism is invalid!");
+        }
 
         // max_file_size
         this.maxFileSize = 
properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
         this.deleteExistingFiles = 
properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");
 
+        // label
         if (properties.containsKey(LABEL)) {
             FeNameFormat.checkLabel(properties.get(LABEL));
+            this.label = properties.get(LABEL);
         } else {
             // generate a random label
-            String label = "export_" + UUID.randomUUID();
-            properties.put(LABEL, label);
+            this.label = "export_" + UUID.randomUUID();
         }
-        label = properties.get(LABEL);
     }
 
     @Override
@@ -408,16 +414,4 @@ public class ExportStmt extends StatementBase {
     public String toString() {
         return toSql();
     }
-
-    public String getMaxFileSize() {
-        return maxFileSize;
-    }
-
-    public String getDeleteExistingFiles() {
-        return deleteExistingFiles;
-    }
-
-    public Integer getParallelNum() {
-        return parallelism;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 24ed977a7d..d2f760d5b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -255,7 +255,7 @@ public class OutFileClause {
         if (brokerDesc != null && isLocalOutput) {
             throw new AnalysisException("No need to specify BROKER properties 
in OUTFILE clause for local file output");
         } else if (brokerDesc == null && !isLocalOutput) {
-            throw new AnalysisException("Must specify BROKER properties in 
OUTFILE clause");
+            throw new AnalysisException("Must specify BROKER properties or 
current local file path in OUTFILE clause");
         }
         isAnalyzed = true;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
index d826da8340..eb2b548b92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
@@ -27,7 +27,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.proc.ExportProcNode;
 import org.apache.doris.common.util.OrderByPair;
-import org.apache.doris.load.ExportJob.JobState;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.qe.ShowResultSetMetaData;
 
 import com.google.common.base.Strings;
@@ -54,7 +54,7 @@ public class ShowExportStmt extends ShowStmt {
     private boolean isLabelUseLike = false;
     private String stateValue = null;
 
-    private JobState jobState;
+    private ExportJobState jobState;
 
     private ArrayList<OrderByPair> orderByPairs;
 
@@ -84,7 +84,7 @@ public class ShowExportStmt extends ShowStmt {
         return this.jobId;
     }
 
-    public JobState getJobState() {
+    public ExportJobState getJobState() {
         if (Strings.isNullOrEmpty(stateValue)) {
             return null;
         }
@@ -152,7 +152,7 @@ public class ShowExportStmt extends ShowStmt {
                     if (!Strings.isNullOrEmpty(value)) {
                         stateValue = value.toUpperCase();
                         try {
-                            jobState = JobState.valueOf(stateValue);
+                            jobState = ExportJobState.valueOf(stateValue);
                             valid = true;
                         } catch (IllegalArgumentException e) {
                             LOG.warn("illegal state argument in export stmt. 
stateValue={}, error={}", stateValue, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 0611016964..041db9b430 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3755,7 +3755,7 @@ public class Env {
         return timerJobManager;
     }
 
-    public TransientTaskManager getMemoryTaskManager() {
+    public TransientTaskManager getTransientTaskManager() {
         return transientTaskManager;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
index b81561399d..781ee38d52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
@@ -22,7 +22,7 @@ import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.loadv2.LoadManager;
 
@@ -147,10 +147,10 @@ public class JobsProcDir implements ProcDirInterface {
 
         // export
         ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
-        pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING, dbId);
-        runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING, dbId);
-        finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED, dbId);
-        cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED, dbId);
+        pendingNum = exportMgr.getJobNum(ExportJobState.PENDING, dbId);
+        runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING, dbId);
+        finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED, dbId);
+        cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED, dbId);
         totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
         result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), 
runningNum.toString(), finishedNum.toString(),
                 cancelledNum.toString(), totalNum.toString()));
@@ -209,10 +209,10 @@ public class JobsProcDir implements ProcDirInterface {
 
         // export
         ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
-        pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING);
-        runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING);
-        finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED);
-        cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED);
+        pendingNum = exportMgr.getJobNum(ExportJobState.PENDING);
+        runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING);
+        finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED);
+        cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED);
         totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
         result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), 
runningNum.toString(), finishedNum.toString(),
                 cancelledNum.toString(), totalNum.toString()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index e155751d5b..d8b8c62bd5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -47,6 +47,7 @@ import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.bdbje.Timestamp;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobStateTransfer;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
@@ -319,7 +320,7 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             case OperationType.OP_EXPORT_UPDATE_STATE:
-                data = ExportJob.StateTransfer.read(in);
+                data = ExportJobStateTransfer.read(in);
                 isRead = true;
                 break;
             case OperationType.OP_FINISH_DELETE: {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 1527ba1ce4..bd975bb6fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -48,17 +48,17 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.registry.ExportTaskRegister;
+import org.apache.doris.scheduler.registry.TransientTaskRegister;
 import org.apache.doris.task.ExportExportingTask;
 import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -66,7 +66,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
-import lombok.Getter;
+import lombok.Data;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -81,27 +81,21 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
-// NOTE: we must be carefully if we send next request
-//       as soon as receiving one instance's report from one BE,
-//       because we may change job's member concurrently.
+@Data
 public class ExportJob implements Writable {
     private static final Logger LOG = LogManager.getLogger(ExportJob.class);
 
     private static final String BROKER_PROPERTY_PREFIXES = "broker.";
 
-    public enum JobState {
-        PENDING,
-        IN_QUEUE,
-        EXPORTING,
-        FINISHED,
-        CANCELLED,
-    }
+    private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = 
Config.maximum_tablets_of_outfile_in_export;
+
+    public static final TransientTaskRegister register = new 
ExportTaskRegister(
+            Env.getCurrentEnv().getTransientTaskManager());
 
     @SerializedName("id")
     private long id;
-    @SerializedName("queryId")
-    private String queryId;
     @SerializedName("label")
     private String label;
     @SerializedName("dbId")
@@ -121,7 +115,7 @@ public class ExportJob implements Writable {
     @SerializedName("tableName")
     private TableName tableName;
     @SerializedName("state")
-    private JobState state;
+    private ExportJobState state;
     @SerializedName("createTimeMs")
     private long createTimeMs;
     // this is the origin stmt of ExportStmt, we use it to persist where expr 
of Export job,
@@ -153,15 +147,19 @@ public class ExportJob implements Writable {
     // progress has two functions at EXPORTING stage:
     // 1. when progress < 100, it indicates exporting
     // 2. set progress = 100 ONLY when exporting progress is completely done
+    @SerializedName("progress")
     private int progress;
 
+    @SerializedName("tabletsNum")
+    private Integer tabletsNum;
+
     private TableRef tableRef;
 
     private Expr whereExpr;
 
     private String sql = "";
 
-    private Integer parallelNum;
+    private Integer parallelism;
 
     public Map<String, Long> getPartitionToVersion() {
         return partitionToVersion;
@@ -170,9 +168,11 @@ public class ExportJob implements Writable {
     private Map<String, Long> partitionToVersion = Maps.newHashMap();
 
     // The selectStmt is sql 'select ... into outfile ...'
-    @Getter
+    // TODO(ftw): delete
     private List<SelectStmt> selectStmtList = Lists.newArrayList();
 
+    private List<List<SelectStmt>> selectStmtListPerParallel = 
Lists.newArrayList();
+
     private List<StmtExecutor> stmtExecutorList;
 
     private List<String> exportColumns = Lists.newArrayList();
@@ -188,16 +188,21 @@ public class ExportJob implements Writable {
 
     private ExportExportingTask task;
 
-    private List<TScanRangeLocations> tabletLocations = Lists.newArrayList();
     // backend_address => snapshot path
     private List<Pair<TNetworkAddress, String>> snapshotPaths = 
Lists.newArrayList();
 
+    private List<ExportTaskExecutor> jobExecutorList;
+
+    private ConcurrentHashMap<Long, ExportTaskExecutor> taskIdToExecutor = new 
ConcurrentHashMap<>();
+
+    private Integer finishedTaskCount = 0;
+    private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
+
     public ExportJob() {
         this.id = -1;
-        this.queryId = "";
         this.dbId = -1;
         this.tableId = -1;
-        this.state = JobState.PENDING;
+        this.state = ExportJobState.PENDING;
         this.progress = 0;
         this.createTimeMs = System.currentTimeMillis();
         this.startTimeMs = -1;
@@ -215,55 +220,37 @@ public class ExportJob implements Writable {
         this.id = jobId;
     }
 
-    public void setJob(ExportStmt stmt) throws UserException {
-        String dbName = stmt.getTblName().getDb();
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-        Preconditions.checkNotNull(stmt.getBrokerDesc());
-        this.brokerDesc = stmt.getBrokerDesc();
-        this.columnSeparator = stmt.getColumnSeparator();
-        this.lineDelimiter = stmt.getLineDelimiter();
-        this.label = stmt.getLabel();
-        this.queryId = ConnectContext.get() != null ? 
DebugUtil.printId(ConnectContext.get().queryId()) : "N/A";
-        String path = stmt.getPath();
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
-        this.whereExpr = stmt.getWhereExpr();
-        this.parallelNum = stmt.getParallelNum();
-        this.exportPath = path;
-        this.sessionVariables = stmt.getSessionVariables();
-        this.timeoutSecond = sessionVariables.getQueryTimeoutS();
-
-        this.qualifiedUser = stmt.getQualifiedUser();
-        this.userIdentity = stmt.getUserIdentity();
-        this.format = stmt.getFormat();
-        this.maxFileSize = stmt.getMaxFileSize();
-        this.deleteExistingFiles = stmt.getDeleteExistingFiles();
-        this.partitionNames = stmt.getPartitions();
-
-        this.exportTable = 
db.getTableOrDdlException(stmt.getTblName().getTbl());
-        this.columns = stmt.getColumns();
-        this.tableRef = stmt.getTableRef();
-        if (!Strings.isNullOrEmpty(this.columns)) {
-            Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
-            this.exportColumns = 
split.splitToList(stmt.getColumns().toLowerCase());
-        }
+    /**
+     * For an ExportJob:
+     * The ExportJob is divided into multiple 'ExportTaskExecutor'
+     * according to the 'parallelism' set by the user.
+     * The tablets which will be exported by this ExportJob are divided into 
'parallelism' copies,
+     * and each ExportTaskExecutor is responsible for a list of tablets.
+     * The tablets responsible for an ExportTaskExecutor will be assigned to 
multiple OutfileStmt
+     * according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
+     *
+     * @throws UserException
+     */
+    public void analyze() throws UserException {
         exportTable.readLock();
         try {
-            this.dbId = db.getId();
-            this.tableId = exportTable.getId();
-            this.tableName = stmt.getTblName();
-            if (selectStmtList.isEmpty()) {
-                // This scenario is used for 'EXPORT TABLE tbl INTO PATH'
-                // we need generate Select Statement
-                generateQueryStmt(stmt);
-            }
+            // generateQueryStmtOld
+            generateQueryStmt();
         } finally {
             exportTable.readUnlock();
         }
-        this.sql = stmt.toSql();
-        this.origStmt = stmt.getOrigStmt();
+        generateExportJobExecutor();
+    }
+
+    public void generateExportJobExecutor() {
+        jobExecutorList = Lists.newArrayList();
+        for (List<SelectStmt> selectStmts : selectStmtListPerParallel) {
+            ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, 
this);
+            jobExecutorList.add(executor);
+        }
     }
 
-    private void generateQueryStmt(ExportStmt stmt) throws UserException {
+    private void generateQueryStmtOld() throws UserException {
         SelectList list = new SelectList();
         if (exportColumns.isEmpty()) {
             list.addItem(SelectListItem.createStarItem(this.tableName));
@@ -278,7 +265,16 @@ public class ExportJob implements Writable {
             }
         }
 
-        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        ArrayList<ArrayList<Long>> tabletsListPerQuery = splitTablets();
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
Lists.newArrayList();
+        for (ArrayList<Long> tabletsList : tabletsListPerQuery) {
+            TableRef tblRef = new TableRef(this.tableRef.getName(), 
this.tableRef.getAlias(), null, tabletsList,
+                    this.tableRef.getTableSample(), 
this.tableRef.getCommonHints());
+            ArrayList<TableRef> tableRefList = Lists.newArrayList();
+            tableRefList.add(tblRef);
+            tableRefListPerQuery.add(tableRefList);
+        }
         LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
 
         if (LOG.isDebugEnabled()) {
@@ -306,30 +302,104 @@ public class ExportJob implements Writable {
         }
     }
 
-    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+    /**
+     * Generate outfile select stmt
+     * @throws UserException
+     */
+    private void generateQueryStmt() throws UserException {
+        SelectList list = new SelectList();
+        if (exportColumns.isEmpty()) {
+            list.addItem(SelectListItem.createStarItem(this.tableName));
+        } else {
+            for (Column column : exportTable.getBaseSchema()) {
+                String colName = column.getName().toLowerCase();
+                if (exportColumns.contains(colName)) {
+                    SlotRef slotRef = new SlotRef(this.tableName, colName);
+                    SelectListItem selectListItem = new 
SelectListItem(slotRef, null);
+                    list.addItem(selectListItem);
+                }
+            }
+        }
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerParallel = 
getTableRefListPerParallel();
+        LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, 
tableRefListPerParallel.size());
+
+        // debug LOG output
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < tableRefListPerParallel.size(); i++) {
+                LOG.debug("ExportTaskExecutor {} is responsible for tablets:", 
i);
+                for (TableRef tableRef : tableRefListPerParallel.get(i)) {
+                    LOG.debug("Tablet id: [{}]", 
tableRef.getSampleTabletIds());
+                }
+            }
+        }
+
+        // generate 'select..outfile..' statement
+        for (ArrayList<TableRef> tableRefList : tableRefListPerParallel) {
+            List<SelectStmt> selectStmtLists = Lists.newArrayList();
+            for (TableRef tableRef : tableRefList) {
+                ArrayList<TableRef> tmpTableRefList = 
Lists.newArrayList(tableRef);
+                FromClause fromClause = new FromClause(tmpTableRefList);
+                // generate outfile clause
+                OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+                SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                        null, null, LimitElement.NO_LIMIT);
+                selectStmt.setOutFileClause(outfile);
+                selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 
0));
+                selectStmtLists.add(selectStmt);
+            }
+            selectStmtListPerParallel.add(selectStmtLists);
+        }
+
+        // debug LOG output
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
+                LOG.debug("ExportTaskExecutor {} is responsible for outfile:", 
i);
+                for (SelectStmt outfile : selectStmtListPerParallel.get(i)) {
+                    LOG.debug("outfile sql: [{}]", outfile.toSql());
+                }
+            }
+        }
+    }
+
+    private ArrayList<ArrayList<TableRef>> getTableRefListPerParallel() throws 
UserException {
+        ArrayList<ArrayList<Long>> tabletsListPerParallel = splitTablets();
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerParallel = 
Lists.newArrayList();
+        for (ArrayList<Long> tabletsList : tabletsListPerParallel) {
+            ArrayList<TableRef> tableRefList = Lists.newArrayList();
+            for (int i = 0; i < tabletsList.size(); i += 
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
+                int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < 
tabletsList.size()
+                        ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : 
tabletsList.size();
+                ArrayList<Long> tablets = new 
ArrayList<>(tabletsList.subList(i, end));
+                TableRef tblRef = new TableRef(this.tableRef.getName(), 
this.tableRef.getAlias(),
+                        this.tableRef.getPartitionNames(), tablets,
+                        this.tableRef.getTableSample(), 
this.tableRef.getCommonHints());
+                tableRefList.add(tblRef);
+            }
+            tableRefListPerParallel.add(tableRefList);
+        }
+        return tableRefListPerParallel;
+    }
+
+    private ArrayList<ArrayList<Long>> splitTablets() throws UserException {
         // get tablets
-        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
-        OlapTable table = 
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
+        OlapTable table = 
db.getOlapTableOrAnalysisException(this.tableName.getTbl());
         List<Long> tabletIdList = Lists.newArrayList();
         table.readLock();
         try {
-            Collection<Partition> partitions = new ArrayList<Partition>();
+            final Collection<Partition> partitions = new 
ArrayList<Partition>();
             // get partitions
             // user specifies partitions, already checked in ExportStmt
             if (this.partitionNames != null) {
-                if (partitionNames.size() > 
Config.maximum_number_of_export_partitions) {
-                    throw new UserException("The partitions number of this 
export job is larger than the maximum number"
-                            + " of partitions allowed by a export job");
-                }
-                for (String partName : this.partitionNames) {
-                    partitions.add(table.getPartition(partName));
-                }
+                this.partitionNames.forEach(partitionName -> 
partitions.add(table.getPartition(partitionName)));
             } else {
                 if (table.getPartitions().size() > 
Config.maximum_number_of_export_partitions) {
                     throw new UserException("The partitions number of this 
export job is larger than the maximum number"
                             + " of partitions allowed by a export job");
                 }
-                partitions = table.getPartitions();
+                partitions.addAll(table.getPartitions());
             }
 
             // get tablets
@@ -344,38 +414,34 @@ public class ExportJob implements Writable {
         }
 
         Integer tabletsAllNum = tabletIdList.size();
-        Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum;
-        Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerQuery * this.parallelNum;
-
-        Integer start = 0;
-
-        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
Lists.newArrayList();
-
-        int outfileNum = this.parallelNum;
-        if (tabletsAllNum < this.parallelNum) {
-            outfileNum = tabletsAllNum;
+        tabletsNum = tabletsAllNum;
+        Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
+        Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerParallel * this.parallelism;
+
+        ArrayList<ArrayList<Long>> tabletsListPerParallel = 
Lists.newArrayList();
+        Integer realParallelism = this.parallelism;
+        if (tabletsAllNum < this.parallelism) {
+            realParallelism = tabletsAllNum;
             LOG.warn("Export Job [{}]: The number of tablets ({}) is smaller 
than parallelism ({}), "
-                        + "set parallelism to tablets num.", id, 
tabletsAllNum, this.parallelNum);
+                        + "set parallelism to tablets num.", id, 
tabletsAllNum, this.parallelism);
         }
-        for (int i = 0; i < outfileNum; ++i) {
-            Integer tabletsNum = tabletsNumPerQuery;
+        Integer start = 0;
+        for (int i = 0; i < realParallelism; ++i) {
+            Integer tabletsNum = tabletsNumPerParallel;
             if (tabletsNumPerQueryRemainder > 0) {
                 tabletsNum = tabletsNum + 1;
                 --tabletsNumPerQueryRemainder;
             }
             ArrayList<Long> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
             start += tabletsNum;
-            TableRef tblRef = new TableRef(this.tableRef.getName(), 
this.tableRef.getAlias(), null, tablets,
-                    this.tableRef.getTableSample(), 
this.tableRef.getCommonHints());
-            ArrayList<TableRef> tableRefList = Lists.newArrayList();
-            tableRefList.add(tblRef);
-            tableRefListPerQuery.add(tableRefList);
+
+            tabletsListPerParallel.add(tablets);
         }
-        return tableRefListPerQuery;
+        return tabletsListPerParallel;
     }
 
     private Map<String, String> convertOutfileProperties() {
-        Map<String, String> outfileProperties = Maps.newHashMap();
+        final Map<String, String> outfileProperties = Maps.newHashMap();
 
         // file properties
         if (format.equals("csv") || format.equals("csv_with_names") || 
format.equals("csv_with_names_and_types")) {
@@ -393,9 +459,7 @@ public class ExportJob implements Writable {
         // outfile clause's broker properties need 'broker.' prefix
         if (brokerDesc.getStorageType() == StorageType.BROKER) {
             outfileProperties.put(BROKER_PROPERTY_PREFIXES + "name", 
brokerDesc.getName());
-            for (Entry<String, String> kv : 
brokerDesc.getProperties().entrySet()) {
-                outfileProperties.put(BROKER_PROPERTY_PREFIXES + kv.getKey(), 
kv.getValue());
-            }
+            brokerDesc.getProperties().forEach((k, v) -> 
outfileProperties.put(BROKER_PROPERTY_PREFIXES + k, v));
         } else {
             for (Entry<String, String> kv : 
brokerDesc.getProperties().entrySet()) {
                 outfileProperties.put(kv.getKey(), kv.getValue());
@@ -404,131 +468,25 @@ public class ExportJob implements Writable {
         return outfileProperties;
     }
 
-    public String getColumns() {
-        return columns;
-    }
-
-    public long getId() {
-        return id;
-    }
-
-    public long getDbId() {
-        return dbId;
-    }
-
-    public long getTableId() {
-        return this.tableId;
-    }
-
-    public Expr getWhereExpr() {
-        return whereExpr;
-    }
-
-    public synchronized JobState getState() {
+    public synchronized ExportJobState getState() {
         return state;
     }
 
-    public BrokerDesc getBrokerDesc() {
-        return brokerDesc;
-    }
-
-    public void setBrokerDesc(BrokerDesc brokerDesc) {
-        this.brokerDesc = brokerDesc;
-    }
-
-    public String getExportPath() {
-        return exportPath;
-    }
-
-    public String getColumnSeparator() {
-        return this.columnSeparator;
-    }
-
-    public String getLineDelimiter() {
-        return this.lineDelimiter;
-    }
-
-    public int getTimeoutSecond() {
-        return timeoutSecond;
-    }
-
-    public String getFormat() {
-        return format;
-    }
-
-    public String getMaxFileSize() {
-        return maxFileSize;
-    }
-
-    public String getDeleteExistingFiles() {
-        return deleteExistingFiles;
-    }
-
-    public String getQualifiedUser() {
-        return qualifiedUser;
-    }
-
-    public UserIdentity getUserIdentity() {
-        return userIdentity;
-    }
-
-    public List<String> getPartitions() {
-        return partitionNames;
-    }
-
-    public int getProgress() {
-        return progress;
-    }
-
-    public void setProgress(int progress) {
-        this.progress = progress;
-    }
-
-    public long getCreateTimeMs() {
-        return createTimeMs;
-    }
-
-    public long getStartTimeMs() {
-        return startTimeMs;
-    }
-
-    public void setStartTimeMs(long startTimeMs) {
-        this.startTimeMs = startTimeMs;
-    }
-
-    public long getFinishTimeMs() {
-        return finishTimeMs;
-    }
-
-    public void setFinishTimeMs(long finishTimeMs) {
-        this.finishTimeMs = finishTimeMs;
-    }
-
-    public ExportFailMsg getFailMsg() {
-        return failMsg;
-    }
-
-    public void setFailMsg(ExportFailMsg failMsg) {
-        this.failMsg = failMsg;
-    }
-
-    public String getOutfileInfo() {
-        return outfileInfo;
-    }
-
-    public void setOutfileInfo(String outfileInfo) {
-        this.outfileInfo = outfileInfo;
+    private void setExportJobState(ExportJobState newState) {
+        this.state = newState;
     }
 
-
+    // TODO(ftw): delete
     public synchronized Thread getDoExportingThread() {
         return doExportingThread;
     }
 
+    // TODO(ftw): delete
     public synchronized void setDoExportingThread(Thread isExportingThread) {
         this.doExportingThread = isExportingThread;
     }
 
+    // TODO(ftw): delete
     public synchronized void setStmtExecutor(int idx, StmtExecutor executor) {
         this.stmtExecutorList.set(idx, executor);
     }
@@ -537,51 +495,20 @@ public class ExportJob implements Writable {
         return this.stmtExecutorList.get(idx);
     }
 
-    public List<TScanRangeLocations> getTabletLocations() {
-        return tabletLocations;
-    }
-
-    public List<Pair<TNetworkAddress, String>> getSnapshotPaths() {
-        return this.snapshotPaths;
-    }
-
-    public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) {
-        this.snapshotPaths.add(snapshotPath);
-    }
-
-    public String getSql() {
-        return sql;
-    }
-
-    public ExportExportingTask getTask() {
-        return task;
-    }
-
-    public void setTask(ExportExportingTask task) {
-        this.task = task;
-    }
-
-    public TableName getTableName() {
-        return tableName;
-    }
-
-    public SessionVariable getSessionVariables() {
-        return sessionVariables;
-    }
-
+    // TODO(ftw): delete
     public synchronized void cancel(ExportFailMsg.CancelType type, String msg) 
{
         if (msg != null) {
             failMsg = new ExportFailMsg(type, msg);
         }
 
         // maybe user cancel this job
-        if (task != null && state == JobState.EXPORTING && stmtExecutorList != 
null) {
+        if (task != null && state == ExportJobState.EXPORTING && 
stmtExecutorList != null) {
             for (int idx = 0; idx < stmtExecutorList.size(); ++idx) {
                 stmtExecutorList.get(idx).cancel();
             }
         }
 
-        if (updateState(ExportJob.JobState.CANCELLED, false)) {
+        if (updateState(ExportJobState.CANCELLED, false)) {
             // release snapshot
             // Status releaseSnapshotStatus = releaseSnapshotPaths();
             // if (!releaseSnapshotStatus.ok()) {
@@ -592,23 +519,150 @@ public class ExportJob implements Writable {
         }
     }
 
+    public synchronized void updateExportJobState(ExportJobState newState, 
Long taskId,
+            List<OutfileInfo> outfileInfoList, ExportFailMsg.CancelType type, 
String msg) throws JobException {
+        switch (newState) {
+            case PENDING:
+                throw new JobException("Can not update ExportJob state to 
'PENDING', job id: [{}], task id: [{}]",
+                        id, taskId);
+            case EXPORTING:
+                exportExportJob();
+                break;
+            case CANCELLED:
+                cancelExportTask(type, msg);
+                break;
+            case FINISHED:
+                finishExportTask(taskId, outfileInfoList);
+                break;
+            default:
+                return;
+        }
+    }
+
+    public void cancelReplayedExportJob(ExportFailMsg.CancelType type, String 
msg) {
+        setExportJobState(ExportJobState.CANCELLED);
+        failMsg = new ExportFailMsg(type, msg);
+    }
+
+    private void cancelExportTask(ExportFailMsg.CancelType type, String msg) 
throws JobException {
+        if (getState() == ExportJobState.CANCELLED) {
+            return;
+        }
+
+        if (getState() == ExportJobState.FINISHED) {
+            throw new JobException("Job {} has finished, can not been 
cancelled", id);
+        }
+
+        if (getState() == ExportJobState.PENDING) {
+            startTimeMs = System.currentTimeMillis();
+        }
+
+        // we need cancel all task
+        taskIdToExecutor.keySet().forEach(id -> {
+            try {
+                register.cancelTask(id);
+            } catch (JobException e) {
+                LOG.warn("cancel export task {} exception: {}", id, e);
+            }
+        });
+
+        cancelExportJobUnprotected(type, msg);
+    }
+
+    private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, 
String msg) {
+        setExportJobState(ExportJobState.CANCELLED);
+        finishTimeMs = System.currentTimeMillis();
+        failMsg = new ExportFailMsg(type, msg);
+        Env.getCurrentEnv().getEditLog().logExportUpdateState(id, 
ExportJobState.CANCELLED);
+    }
+
+    // TODO(ftw): delete
     public synchronized boolean finish(List<OutfileInfo> outfileInfoList) {
         outfileInfo = GsonUtils.GSON.toJson(outfileInfoList);
-        if (updateState(ExportJob.JobState.FINISHED)) {
+        if (updateState(ExportJobState.FINISHED)) {
             return true;
         }
         return false;
     }
 
-    public synchronized boolean updateState(ExportJob.JobState newState) {
+    private void exportExportJob() {
+        // The first exportTaskExecutor will set state to EXPORTING,
+        // other exportTaskExecutors do not need to set up state.
+        if (getState() == ExportJobState.EXPORTING) {
+            return;
+        }
+        setExportJobState(ExportJobState.EXPORTING);
+        // if isReplay == true, startTimeMs will be read from LOG
+        startTimeMs = System.currentTimeMillis();
+    }
+
+    private void finishExportTask(Long taskId, List<OutfileInfo> 
outfileInfoList) throws JobException {
+        if (getState() == ExportJobState.CANCELLED) {
+            throw new JobException("Job [{}] has been cancelled, can not 
finish this task: {}", id, taskId);
+        }
+
+        allOutfileInfo.add(outfileInfoList);
+        ++finishedTaskCount;
+
+        // calculate progress
+        int tmpProgress = finishedTaskCount * 100 / jobExecutorList.size();
+        if (finishedTaskCount * 100 / jobExecutorList.size() >= 100) {
+            progress = 99;
+        } else {
+            progress = tmpProgress;
+        }
+
+        // if all task finished
+        if (finishedTaskCount == jobExecutorList.size()) {
+            finishExportJobUnprotected();
+        }
+    }
+
+    private void finishExportJobUnprotected() {
+        progress = 100;
+        setExportJobState(ExportJobState.FINISHED);
+        finishTimeMs = System.currentTimeMillis();
+        outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
+        Env.getCurrentEnv().getEditLog().logExportUpdateState(id, 
ExportJobState.FINISHED);
+    }
+
+    public void replayExportJobState(ExportJobState newState) {
+        switch (newState) {
+            // We do not persist EXPORTING state in new version of metadata,
+            // but EXPORTING state may still exist in older versions of 
metadata.
+            // So if isReplay == true and newState == EXPORTING, we set 
newState = CANCELLED.
+            case EXPORTING:
+            // We do not need IN_QUEUE state in new version of export
+            // but IN_QUEUE state may still exist in older versions of 
metadata.
+            // So if isReplay == true and newState == IN_QUEUE, we set 
newState = CANCELLED.
+            case IN_QUEUE:
+                newState = ExportJobState.CANCELLED;
+                break;
+            case PENDING:
+            case CANCELLED:
+                progress = 0;
+                break;
+            case FINISHED:
+                progress = 100;
+                break;
+            default:
+                Preconditions.checkState(false, "wrong job state: " + 
newState.name());
+                break;
+        }
+        setExportJobState(newState);
+    }
+
+    // TODO(ftw): delete
+    public synchronized boolean updateState(ExportJobState newState) {
         return this.updateState(newState, false);
     }
 
-    public synchronized boolean updateState(ExportJob.JobState newState, 
boolean isReplay) {
+    // TODO(ftw): delete
+    public synchronized boolean updateState(ExportJobState newState, boolean 
isReplay) {
         // We do not persist EXPORTING state in new version of metadata,
         // but EXPORTING state may still exist in older versions of metadata.
         // So if isReplay == true and newState == EXPORTING, we just ignore 
this update.
-        if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) {
+        if (isFinalState() || (isReplay && newState == 
ExportJobState.EXPORTING)) {
             return false;
         }
         state = newState;
@@ -618,7 +672,7 @@ public class ExportJob implements Writable {
                 progress = 0;
                 break;
             case EXPORTING:
-                // if isReplay == true, startTimeMs will be read from log
+                // if isReplay == true, startTimeMs will be read from LOG
                 if (!isReplay) {
                     startTimeMs = System.currentTimeMillis();
                 }
@@ -630,7 +684,7 @@ public class ExportJob implements Writable {
                 progress = 100;
                 break;
             case CANCELLED:
-                // if isReplay == true, finishTimeMs will be read from log
+                // if isReplay == true, finishTimeMs will be read from LOG
                 if (!isReplay) {
                     finishTimeMs = System.currentTimeMillis();
                 }
@@ -640,27 +694,19 @@ public class ExportJob implements Writable {
                 break;
         }
         // we only persist Pending/Cancel/Finish state
-        if (!isReplay && newState != JobState.IN_QUEUE && newState != 
JobState.EXPORTING) {
+        if (!isReplay && newState != ExportJobState.IN_QUEUE && newState != 
ExportJobState.EXPORTING) {
             Env.getCurrentEnv().getEditLog().logExportUpdateState(id, 
newState);
         }
         return true;
     }
 
     public synchronized boolean isFinalState() {
-        return this.state == ExportJob.JobState.CANCELLED || this.state == 
ExportJob.JobState.FINISHED;
+        return this.state == ExportJobState.CANCELLED || this.state == 
ExportJobState.FINISHED;
     }
 
     public boolean isExpired(long curTime) {
         return (curTime - createTimeMs) / 1000 > 
Config.history_job_keep_max_second
-                && (state == ExportJob.JobState.CANCELLED || state == 
ExportJob.JobState.FINISHED);
-    }
-
-    public String getLabel() {
-        return label;
-    }
-
-    public String getQueryId() {
-        return queryId;
+                && (state == ExportJobState.CANCELLED || state == 
ExportJobState.FINISHED);
     }
 
     @Override
@@ -737,7 +783,7 @@ public class ExportJob implements Writable {
             }
         }
 
-        state = JobState.valueOf(Text.readString(in));
+        state = ExportJobState.valueOf(Text.readString(in));
         createTimeMs = in.readLong();
         startTimeMs = in.readLong();
         finishTimeMs = in.readLong();
@@ -793,132 +839,4 @@ public class ExportJob implements Writable {
 
         return false;
     }
-
-    public boolean isReplayed() {
-        return isReplayed;
-    }
-
-    // for only persist op when switching job state.
-    public static class StateTransfer implements Writable {
-        @SerializedName("jobId")
-        long jobId;
-        @SerializedName("state")
-        JobState state;
-        @SerializedName("startTimeMs")
-        private long startTimeMs;
-        @SerializedName("finishTimeMs")
-        private long finishTimeMs;
-        @SerializedName("failMsg")
-        private ExportFailMsg failMsg;
-        @SerializedName("outFileInfo")
-        private String outFileInfo;
-
-        // used for reading from one log
-        public StateTransfer() {
-            this.jobId = -1;
-            this.state = JobState.CANCELLED;
-            this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, 
"");
-            this.outFileInfo = "";
-        }
-
-        // used for persisting one log
-        public StateTransfer(long jobId, JobState state) {
-            this.jobId = jobId;
-            this.state = state;
-            ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId);
-            this.startTimeMs = job.getStartTimeMs();
-            this.finishTimeMs = job.getFinishTimeMs();
-            this.failMsg = job.getFailMsg();
-            this.outFileInfo = job.getOutfileInfo();
-        }
-
-        public long getJobId() {
-            return jobId;
-        }
-
-        public JobState getState() {
-            return state;
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            String json = GsonUtils.GSON.toJson(this);
-            Text.writeString(out, json);
-        }
-
-        public static StateTransfer read(DataInput in) throws IOException {
-            if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) 
{
-                StateTransfer transfer = new StateTransfer();
-                transfer.readFields(in);
-                return transfer;
-            }
-            String json = Text.readString(in);
-            StateTransfer transfer = GsonUtils.GSON.fromJson(json, 
ExportJob.StateTransfer.class);
-            return transfer;
-        }
-
-        private void readFields(DataInput in) throws IOException {
-            jobId = in.readLong();
-            state = JobState.valueOf(Text.readString(in));
-        }
-
-        public long getStartTimeMs() {
-            return startTimeMs;
-        }
-
-        public long getFinishTimeMs() {
-            return finishTimeMs;
-        }
-
-        public String getOutFileInfo() {
-            return outFileInfo;
-        }
-
-        public ExportFailMsg getFailMsg() {
-            return failMsg;
-        }
-    }
-
-    public static class OutfileInfo {
-        @SerializedName("fileNumber")
-        private String fileNumber;
-        @SerializedName("totalRows")
-        private String totalRows;
-        @SerializedName("fileSize")
-        private String fileSize;
-        @SerializedName("url")
-        private String url;
-
-        public String getUrl() {
-            return url;
-        }
-
-        public void setUrl(String url) {
-            this.url = url;
-        }
-
-        public String getFileNumber() {
-            return fileNumber;
-        }
-
-        public void setFileNumber(String fileNumber) {
-            this.fileNumber = fileNumber;
-        }
-
-        public String getTotalRows() {
-            return totalRows;
-        }
-
-        public void setTotalRows(String totalRows) {
-            this.totalRows = totalRows;
-        }
-
-        public String getFileSize() {
-            return fileSize;
-        }
-
-        public void setFileSize(String fileSize) {
-            this.fileSize = fileSize;
-        }
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java
new file mode 100644
index 0000000000..4fd4cabf80
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+public enum ExportJobState {
+
+    /**
+     * the initial state of export job.
+     */
+    PENDING,
+
+    /**
+     * When the export job is waiting to be schedule.
+     */
+    IN_QUEUE,
+
+    /**
+     * When the export job is exporting, the EXPORTING state will be triggered.
+     */
+    EXPORTING,
+
+    /**
+     * When the export job is finished, the FINISHED state will be triggered.
+     */
+    FINISHED,
+
+    /**
+     * When the export job is cancelled, the CANCELLED state will be triggered.
+     */
+    CANCELLED,
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java
new file mode 100644
index 0000000000..06253b1f1e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+@Getter
+public class ExportJobStateTransfer implements Writable {
+    @SerializedName("jobId")
+    long jobId;
+    @SerializedName("state")
+    private ExportJobState state;
+    @SerializedName("startTimeMs")
+    private long startTimeMs;
+    @SerializedName("finishTimeMs")
+    private long finishTimeMs;
+    @SerializedName("failMsg")
+    private ExportFailMsg failMsg;
+    @SerializedName("outFileInfo")
+    private String outFileInfo;
+
+    // used for reading from one log
+    public ExportJobStateTransfer() {
+        this.jobId = -1;
+        this.state = ExportJobState.CANCELLED;
+        this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
+        this.outFileInfo = "";
+    }
+
+    // used for persisting one log
+    public ExportJobStateTransfer(long jobId, ExportJobState state) {
+        this.jobId = jobId;
+        this.state = state;
+        ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId);
+        this.startTimeMs = job.getStartTimeMs();
+        this.finishTimeMs = job.getFinishTimeMs();
+        this.failMsg = job.getFailMsg();
+        this.outFileInfo = job.getOutfileInfo();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static ExportJobStateTransfer read(DataInput in) throws IOException 
{
+        if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) {
+            ExportJobStateTransfer transfer = new ExportJobStateTransfer();
+            transfer.readFields(in);
+            return transfer;
+        }
+        String json = Text.readString(in);
+        ExportJobStateTransfer transfer = GsonUtils.GSON.fromJson(json, 
ExportJobStateTransfer.class);
+        return transfer;
+    }
+
+    private void readFields(DataInput in) throws IOException {
+        jobId = in.readLong();
+        state = ExportJobState.valueOf(Text.readString(in));
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 0e0b26f273..9a0bc7953f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -35,9 +35,9 @@ import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.OrderByPair;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.ExportJob.JobState;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.task.ExportExportingTask;
 import org.apache.doris.task.MasterTask;
 import org.apache.doris.task.MasterTaskExecutor;
@@ -69,8 +69,8 @@ public class ExportMgr extends MasterDaemon {
     // lock is private and must use after db lock
     private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
-    private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId 
to exportJob
-    private Map<String, Long> labelToJobId = Maps.newHashMap();
+    private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // 
exportJobId to exportJob
+    private Map<String, Long> labelToExportJobId = Maps.newHashMap();
 
     private MasterTaskExecutor exportingExecutor;
 
@@ -103,7 +103,7 @@ public class ExportMgr extends MasterDaemon {
 
     @Override
     protected void runAfterCatalogReady() {
-        List<ExportJob> pendingJobs = getExportJobs(JobState.PENDING);
+        List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING);
         List<ExportJob> newInQueueJobs = Lists.newArrayList();
         for (ExportJob job : pendingJobs) {
             if (handlePendingJobs(job)) {
@@ -128,7 +128,7 @@ public class ExportMgr extends MasterDaemon {
 
     private boolean handlePendingJobs(ExportJob job) {
         // because maybe this job has been cancelled by user.
-        if (job.getState() != JobState.PENDING) {
+        if (job.getState() != ExportJobState.PENDING) {
             return false;
         }
 
@@ -136,11 +136,12 @@ public class ExportMgr extends MasterDaemon {
             // If the job is created from replay thread, all plan info will be 
lost.
             // so the job has to be cancelled.
             String failMsg = "FE restarted or Master changed during exporting. 
Job must be cancelled.";
-            job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
+            // job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
+            job.cancelReplayedExportJob(ExportFailMsg.CancelType.RUN_FAIL, 
failMsg);
             return false;
         }
 
-        if (job.updateState(JobState.IN_QUEUE)) {
+        if (job.updateState(ExportJobState.IN_QUEUE)) {
             LOG.info("Exchange pending status to in_queue status success. job: 
{}", job);
             return true;
         }
@@ -148,7 +149,7 @@ public class ExportMgr extends MasterDaemon {
     }
 
     public List<ExportJob> getJobs() {
-        return Lists.newArrayList(idToJob.values());
+        return Lists.newArrayList(exportIdToJob.values());
     }
 
     public void addExportJob(ExportStmt stmt) throws Exception {
@@ -156,7 +157,7 @@ public class ExportMgr extends MasterDaemon {
         ExportJob job = createJob(jobId, stmt);
         writeLock();
         try {
-            if (labelToJobId.containsKey(job.getLabel())) {
+            if (labelToExportJobId.containsKey(job.getLabel())) {
                 throw new LabelAlreadyUsedException(job.getLabel());
             }
             unprotectAddJob(job);
@@ -167,6 +168,28 @@ public class ExportMgr extends MasterDaemon {
         LOG.info("add export job. {}", job);
     }
 
+    public void addExportJobAndRegisterTask(ExportStmt stmt) throws Exception {
+        ExportJob job = stmt.getExportJob();
+        long jobId = Env.getCurrentEnv().getNextId();
+        job.setId(jobId);
+        writeLock();
+        try {
+            if (labelToExportJobId.containsKey(job.getLabel())) {
+                throw new LabelAlreadyUsedException(job.getLabel());
+            }
+            unprotectAddJob(job);
+            job.getJobExecutorList().forEach(executor -> {
+                Long taskId = ExportJob.register.registerTask(executor);
+                executor.setTaskId(taskId);
+                job.getTaskIdToExecutor().put(taskId, executor);
+            });
+            Env.getCurrentEnv().getEditLog().logExportCreate(job);
+        } finally {
+            writeUnlock();
+        }
+        LOG.info("add export job. {}", job);
+    }
+
     public void cancelExportJob(CancelExportStmt stmt) throws DdlException, 
AnalysisException {
         // List of export jobs waiting to be cancelled
         List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
@@ -178,14 +201,20 @@ public class ExportMgr extends MasterDaemon {
         if (matchExportJobs.isEmpty()) {
             throw new DdlException("All export job(s) are at final state 
(CANCELLED/FINISHED)");
         }
-        for (ExportJob exportJob : matchExportJobs) {
-            exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user 
cancel");
+        try {
+            for (ExportJob exportJob : matchExportJobs) {
+                // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, 
"user cancel");
+                exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, 
null,
+                        ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
+            }
+        } catch (JobException e) {
+            throw new AnalysisException(e.getMessage());
         }
     }
 
     public void unprotectAddJob(ExportJob job) {
-        idToJob.put(job.getId(), job);
-        labelToJobId.putIfAbsent(job.getLabel(), job.getId());
+        exportIdToJob.put(job.getId(), job);
+        labelToExportJobId.putIfAbsent(job.getLabel(), job.getId());
     }
 
     private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws 
AnalysisException {
@@ -224,28 +253,28 @@ public class ExportMgr extends MasterDaemon {
         };
     }
 
-    private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
-        ExportJob job = new ExportJob(jobId);
-        job.setJob(stmt);
-        return job;
+    private ExportJob createJob(long jobId, ExportStmt stmt) {
+        ExportJob exportJob = stmt.getExportJob();
+        exportJob.setId(jobId);
+        return exportJob;
     }
 
     public ExportJob getJob(long jobId) {
-        ExportJob job = null;
+        ExportJob job;
         readLock();
         try {
-            job = idToJob.get(jobId);
+            job = exportIdToJob.get(jobId);
         } finally {
             readUnlock();
         }
         return job;
     }
 
-    public List<ExportJob> getExportJobs(ExportJob.JobState state) {
+    public List<ExportJob> getExportJobs(ExportJobState state) {
         List<ExportJob> result = Lists.newArrayList();
         readLock();
         try {
-            for (ExportJob job : idToJob.values()) {
+            for (ExportJob job : exportIdToJob.values()) {
                 if (job.getState() == state) {
                     result.add(job);
                 }
@@ -260,7 +289,7 @@ public class ExportMgr extends MasterDaemon {
     // used for `show export` statement
     // NOTE: jobid and states may both specified, or only one of them, or 
neither
     public List<List<String>> getExportJobInfosByIdOrState(
-            long dbId, long jobId, String label, boolean isLabelUseLike, 
Set<ExportJob.JobState> states,
+            long dbId, long jobId, String label, boolean isLabelUseLike, 
Set<ExportJobState> states,
             ArrayList<OrderByPair> orderByPairs, long limit) throws 
AnalysisException {
 
         long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
@@ -273,9 +302,9 @@ public class ExportMgr extends MasterDaemon {
         readLock();
         try {
             int counter = 0;
-            for (ExportJob job : idToJob.values()) {
+            for (ExportJob job : exportIdToJob.values()) {
                 long id = job.getId();
-                ExportJob.JobState state = job.getState();
+                ExportJobState state = job.getState();
                 String jobLabel = job.getLabel();
 
                 if (job.getDbId() != dbId) {
@@ -345,7 +374,7 @@ public class ExportMgr extends MasterDaemon {
         readLock();
         try {
             int counter = 0;
-            for (ExportJob job : idToJob.values()) {
+            for (ExportJob job : exportIdToJob.values()) {
                 // check auth
                 if (isJobShowable(job)) {
                     exportJobInfos.add(composeExportJobInfo(job));
@@ -406,7 +435,7 @@ public class ExportMgr extends MasterDaemon {
 
         // task infos
         Map<String, Object> infoMap = Maps.newHashMap();
-        List<String> partitions = job.getPartitions();
+        List<String> partitions = job.getPartitionNames();
         if (partitions == null) {
             partitions = Lists.newArrayList();
             partitions.add("*");
@@ -422,7 +451,7 @@ public class ExportMgr extends MasterDaemon {
         infoMap.put("format", job.getFormat());
         infoMap.put("line_delimiter", job.getLineDelimiter());
         infoMap.put("columns", job.getColumns());
-        infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 : 
job.getTabletLocations().size());
+        infoMap.put("tablet_num", job.getTabletsNum());
         infoMap.put("max_file_size", job.getMaxFileSize());
         infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
         jobInfo.add(new Gson().toJson(infoMap));
@@ -435,7 +464,7 @@ public class ExportMgr extends MasterDaemon {
         jobInfo.add(job.getTimeoutSecond());
 
         // error msg
-        if (job.getState() == ExportJob.JobState.CANCELLED) {
+        if (job.getState() == ExportJobState.CANCELLED) {
             ExportFailMsg failMsg = job.getFailMsg();
             jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + 
failMsg.getMsg());
         } else {
@@ -443,7 +472,7 @@ public class ExportMgr extends MasterDaemon {
         }
 
         // outfileInfo
-        if (job.getState() == JobState.FINISHED) {
+        if (job.getState() == ExportJobState.FINISHED) {
             jobInfo.add(job.getOutfileInfo());
         } else {
             jobInfo.add(FeConstants.null_string);
@@ -457,15 +486,15 @@ public class ExportMgr extends MasterDaemon {
 
         writeLock();
         try {
-            Iterator<Map.Entry<Long, ExportJob>> iter = 
idToJob.entrySet().iterator();
+            Iterator<Map.Entry<Long, ExportJob>> iter = 
exportIdToJob.entrySet().iterator();
             while (iter.hasNext()) {
                 Map.Entry<Long, ExportJob> entry = iter.next();
                 ExportJob job = entry.getValue();
                 if ((currentTimeMs - job.getCreateTimeMs()) / 1000 > 
Config.history_job_keep_max_second
-                        && (job.getState() == ExportJob.JobState.CANCELLED
-                            || job.getState() == ExportJob.JobState.FINISHED)) 
{
+                        && (job.getState() == ExportJobState.CANCELLED
+                            || job.getState() == ExportJobState.FINISHED)) {
                     iter.remove();
-                    labelToJobId.remove(job.getLabel(), job.getId());
+                    labelToExportJobId.remove(job.getLabel(), job.getId());
                 }
             }
         } finally {
@@ -482,11 +511,12 @@ public class ExportMgr extends MasterDaemon {
         }
     }
 
-    public void replayUpdateJobState(ExportJob.StateTransfer stateTransfer) {
+    public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
         readLock();
         try {
-            ExportJob job = idToJob.get(stateTransfer.getJobId());
-            job.updateState(stateTransfer.getState(), true);
+            ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
+            // job.updateState(stateTransfer.getState(), true);
+            job.replayExportJobState(stateTransfer.getState());
             job.setStartTimeMs(stateTransfer.getStartTimeMs());
             job.setFinishTimeMs(stateTransfer.getFinishTimeMs());
             job.setFailMsg(stateTransfer.getFailMsg());
@@ -496,11 +526,11 @@ public class ExportMgr extends MasterDaemon {
         }
     }
 
-    public long getJobNum(ExportJob.JobState state, long dbId) {
+    public long getJobNum(ExportJobState state, long dbId) {
         int size = 0;
         readLock();
         try {
-            for (ExportJob job : idToJob.values()) {
+            for (ExportJob job : exportIdToJob.values()) {
                 if (job.getState() == state && job.getDbId() == dbId) {
                     ++size;
                 }
@@ -511,11 +541,11 @@ public class ExportMgr extends MasterDaemon {
         return size;
     }
 
-    public long getJobNum(ExportJob.JobState state) {
+    public long getJobNum(ExportJobState state) {
         int size = 0;
         readLock();
         try {
-            for (ExportJob job : idToJob.values()) {
+            for (ExportJob job : exportIdToJob.values()) {
                 if 
(!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
                         
Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(),
                         PrivPredicate.LOAD)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
new file mode 100644
index 0000000000..5fdc6962fc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.analysis.OutFileClause;
+import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.load.ExportFailMsg.CancelType;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.executor.TransientTaskExecutor;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class ExportTaskExecutor implements TransientTaskExecutor {
+
+    List<SelectStmt> selectStmtLists;
+
+    ExportJob exportJob;
+
+    @Setter
+    Long taskId;
+
+    private StmtExecutor stmtExecutor;
+
+    private AtomicBoolean isCanceled;
+
+    private AtomicBoolean isFinished;
+
+    ExportTaskExecutor(List<SelectStmt> selectStmtLists, ExportJob exportJob) {
+        this.selectStmtLists = selectStmtLists;
+        this.exportJob = exportJob;
+        this.isCanceled = new AtomicBoolean(false);
+        this.isFinished = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void execute() throws JobException {
+        if (isCanceled.get()) {
+            throw new JobException("Export executor has been canceled, task 
id: {}", taskId);
+        }
+        exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, 
null, null);
+        List<OutfileInfo> outfileInfoList = Lists.newArrayList();
+        for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
+            if (isCanceled.get()) {
+                throw new JobException("Export executor has been canceled, 
task id: {}", taskId);
+            }
+            // check the version of tablets
+            try {
+                Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
+                        exportJob.getTableName().getDb());
+                OlapTable table = 
db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
+                table.readLock();
+                try {
+                    SelectStmt selectStmt = selectStmtLists.get(idx);
+                    List<Long> tabletIds = 
selectStmt.getTableRefs().get(0).getSampleTabletIds();
+                    for (Long tabletId : tabletIds) {
+                        TabletMeta tabletMeta = 
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
+                                tabletId);
+                        Partition partition = 
table.getPartition(tabletMeta.getPartitionId());
+                        long nowVersion = partition.getVisibleVersion();
+                        long oldVersion = 
exportJob.getPartitionToVersion().get(partition.getName());
+                        if (nowVersion != oldVersion) {
+                            
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
+                                    CancelType.RUN_FAIL, "The version of 
tablet {" + tabletId + "} has changed");
+                            throw new JobException("Export Job[{}]: Tablet {} 
has changed version, old version = {}, "
+                                    + "now version = {}", exportJob.getId(), 
tabletId, oldVersion, nowVersion);
+                        }
+                    }
+                } finally {
+                    table.readUnlock();
+                }
+            } catch (AnalysisException e) {
+                exportJob.updateExportJobState(ExportJobState.CANCELLED, 
taskId, null,
+                        ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
+                throw new JobException(e);
+            }
+
+            try (AutoCloseConnectContext r = buildConnectContext()) {
+                stmtExecutor = new StmtExecutor(r.connectContext, 
selectStmtLists.get(idx));
+                stmtExecutor.execute();
+                if (r.connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
+                    exportJob.updateExportJobState(ExportJobState.CANCELLED, 
taskId, null,
+                            ExportFailMsg.CancelType.RUN_FAIL, 
r.connectContext.getState().getErrorMessage());
+                    return;
+                }
+                OutfileInfo outfileInfo = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
+                outfileInfoList.add(outfileInfo);
+            } catch (Exception e) {
+                exportJob.updateExportJobState(ExportJobState.CANCELLED, 
taskId, null,
+                        ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
+                throw new JobException(e);
+            } finally {
+                stmtExecutor.addProfileToSpan();
+            }
+        }
+        if (isCanceled.get()) {
+            throw new JobException("Export executor has been canceled, task 
id: {}", taskId);
+        }
+        exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, 
outfileInfoList, null, null);
+        isFinished.getAndSet(true);
+    }
+
+    @Override
+    public void cancel() throws JobException {
+        if (isFinished.get()) {
+            throw new JobException("Export executor has finished, task id: 
{}", taskId);
+        }
+        isCanceled.getAndSet(true);
+        if (stmtExecutor != null) {
+            stmtExecutor.cancel();
+        }
+    }
+
+    private AutoCloseConnectContext buildConnectContext() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setSessionVariable(exportJob.getSessionVariables());
+        connectContext.setEnv(Env.getCurrentEnv());
+        connectContext.setDatabase(exportJob.getTableName().getDb());
+        connectContext.setQualifiedUser(exportJob.getQualifiedUser());
+        connectContext.setCurrentUserIdentity(exportJob.getUserIdentity());
+        UUID uuid = UUID.randomUUID();
+        TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+        connectContext.setQueryId(queryId);
+        connectContext.setStartTime();
+        connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+        return new AutoCloseConnectContext(connectContext);
+    }
+
+    private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) 
{
+        OutfileInfo outfileInfo = new OutfileInfo();
+        
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
+        
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
+        
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + 
"bytes");
+        outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
+        return outfileInfo;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
new file mode 100644
index 0000000000..b9befd9d32
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+@Data
+public class OutfileInfo {
+
+    @SerializedName("fileNumber")
+    private String fileNumber;
+
+    @SerializedName("totalRows")
+    private String totalRows;
+
+    @SerializedName("fileSize")
+    private String fileSize;
+
+    @SerializedName("url")
+    private String url;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 68d2153556..9a1841be69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -61,6 +61,8 @@ import org.apache.doris.journal.local.LocalJournal;
 import org.apache.doris.load.DeleteHandler;
 import org.apache.doris.load.DeleteInfo;
 import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
+import org.apache.doris.load.ExportJobStateTransfer;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
@@ -365,7 +367,7 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_EXPORT_UPDATE_STATE: {
-                    ExportJob.StateTransfer op = (ExportJob.StateTransfer) 
journal.getData();
+                    ExportJobStateTransfer op = (ExportJobStateTransfer) 
journal.getData();
                     ExportMgr exportMgr = env.getExportMgr();
                     exportMgr.replayUpdateJobState(op);
                     break;
@@ -1461,8 +1463,8 @@ public class EditLog {
         logEdit(OperationType.OP_EXPORT_CREATE, job);
     }
 
-    public void logExportUpdateState(long jobId, ExportJob.JobState newState) {
-        ExportJob.StateTransfer transfer = new ExportJob.StateTransfer(jobId, 
newState);
+    public void logExportUpdateState(long jobId, ExportJobState newState) {
+        ExportJobStateTransfer transfer = new ExportJobStateTransfer(jobId, 
newState);
         logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 11190edc8b..7637c3869d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -181,7 +181,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.external.iceberg.IcebergTableCreationRecord;
 import org.apache.doris.load.DeleteHandler;
-import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.LoadJob;
@@ -1922,8 +1922,8 @@ public class ShowExecutor {
 
         ExportMgr exportMgr = env.getExportMgr();
 
-        Set<ExportJob.JobState> states = null;
-        ExportJob.JobState state = showExportStmt.getJobState();
+        Set<ExportJobState> states = null;
+        ExportJobState state = showExportStmt.getJobState();
         if (state != null) {
             states = Sets.newHashSet(state);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index cdd0d66f89..3bbd96408a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2213,7 +2213,8 @@ public class StmtExecutor {
 
     private void handleExportStmt() throws Exception {
         ExportStmt exportStmt = (ExportStmt) parsedStmt;
-        context.getEnv().getExportMgr().addExportJob(exportStmt);
+        // context.getEnv().getExportMgr().addExportJob(exportStmt);
+        
context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt);
     }
 
     private void handleCtasStmt() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 48f3ce609e..1b17806c71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -30,8 +30,8 @@ import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.load.ExportFailMsg;
 import org.apache.doris.load.ExportFailMsg.CancelType;
 import org.apache.doris.load.ExportJob;
-import org.apache.doris.load.ExportJob.JobState;
-import org.apache.doris.load.ExportJob.OutfileInfo;
+import org.apache.doris.load.ExportJobState;
+import org.apache.doris.load.OutfileInfo;
 import org.apache.doris.qe.AutoCloseConnectContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -69,9 +69,9 @@ public class ExportExportingTask extends MasterTask {
 
         private ExportFailMsg failMsg;
 
-        private ExportJob.OutfileInfo outfileInfo;
+        private OutfileInfo outfileInfo;
 
-        public ExportResult(boolean isFailed, ExportFailMsg failMsg, 
ExportJob.OutfileInfo outfileInfo) {
+        public ExportResult(boolean isFailed, ExportFailMsg failMsg, 
OutfileInfo outfileInfo) {
             this.isFailed = isFailed;
             this.failMsg = failMsg;
             this.outfileInfo = outfileInfo;
@@ -93,11 +93,11 @@ public class ExportExportingTask extends MasterTask {
 
     @Override
     protected void exec() {
-        if (job.getState() == JobState.IN_QUEUE) {
+        if (job.getState() == ExportJobState.IN_QUEUE) {
             handleInQueueState();
         }
 
-        if (job.getState() != ExportJob.JobState.EXPORTING) {
+        if (job.getState() != ExportJobState.EXPORTING) {
             return;
         }
         LOG.info("begin execute export job in exporting state. job: {}", job);
@@ -112,7 +112,7 @@ public class ExportExportingTask extends MasterTask {
 
         List<SelectStmt> selectStmtList = job.getSelectStmtList();
         int completeTaskNum = 0;
-        List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
+        List<OutfileInfo> outfileInfoList = Lists.newArrayList();
 
         int parallelNum = selectStmtList.size();
         CompletionService<ExportResult> completionService = new 
ExecutorCompletionService<>(exportExecPool);
@@ -122,7 +122,7 @@ public class ExportExportingTask extends MasterTask {
             final int idx = i;
             completionService.submit(() -> {
                 // maybe user cancelled this job
-                if (job.getState() != JobState.EXPORTING) {
+                if (job.getState() != ExportJobState.EXPORTING) {
                     return new ExportResult(true, null, null);
                 }
                 try {
@@ -162,7 +162,7 @@ public class ExportExportingTask extends MasterTask {
                         return new ExportResult(true, new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
                                 
r.connectContext.getState().getErrorMessage()), null);
                     }
-                    ExportJob.OutfileInfo outfileInfo = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
+                    OutfileInfo outfileInfo = 
getOutFileInfo(r.connectContext.getResultAttachedInfo());
                     return new ExportResult(false, null, outfileInfo);
                 } catch (Exception e) {
                     return new ExportResult(true, new 
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
@@ -250,8 +250,8 @@ public class ExportExportingTask extends MasterTask {
         return new AutoCloseConnectContext(connectContext);
     }
 
-    private ExportJob.OutfileInfo getOutFileInfo(Map<String, String> 
resultAttachedInfo) {
-        ExportJob.OutfileInfo outfileInfo = new ExportJob.OutfileInfo();
+    private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) 
{
+        OutfileInfo outfileInfo = new OutfileInfo();
         
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
         
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
         
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + 
"bytes");
@@ -274,7 +274,7 @@ public class ExportExportingTask extends MasterTask {
         //     return;
         // }
 
-        if (job.updateState(ExportJob.JobState.EXPORTING)) {
+        if (job.updateState(ExportJobState.EXPORTING)) {
             LOG.info("Exchange pending status to exporting status success. 
job: {}", job);
         }
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
index c06cae0500..1e49f207d0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.load.ExportMgr;
 import org.apache.doris.utframe.TestWithFeService;
 
@@ -154,12 +155,12 @@ public class CancelExportStmtTest extends 
TestWithFeService {
         List<ExportJob> exportJobList2 = Lists.newLinkedList();
         ExportJob job1 = new ExportJob();
         ExportJob job2 = new ExportJob();
-        job2.updateState(ExportJob.JobState.CANCELLED, true);
+        job2.updateState(ExportJobState.CANCELLED, true);
         ExportJob job3 = new ExportJob();
-        job3.updateState(ExportJob.JobState.EXPORTING, false);
+        job3.updateState(ExportJobState.EXPORTING, false);
         ExportJob job4 = new ExportJob();
         ExportJob job5 = new ExportJob();
-        job5.updateState(ExportJob.JobState.IN_QUEUE, false);
+        job5.updateState(ExportJobState.IN_QUEUE, false);
         exportJobList1.add(job1);
         exportJobList1.add(job2);
         exportJobList1.add(job3);
@@ -188,15 +189,6 @@ public class CancelExportStmtTest extends 
TestWithFeService {
 
         Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1);
 
-        stateStringLiteral = new StringLiteral("IN_QUEUE");
-        stateEqPredicate =
-                new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, 
stateStringLiteral);
-        stmt = new CancelExportStmt(null, stateEqPredicate);
-        stmt.analyze(analyzer);
-        filter = ExportMgr.buildCancelJobFilter(stmt);
-
-        Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1);
-
     }
 
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 7d1cd5a47a..e7209d58a4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.VariableAnnotation;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
 import org.apache.doris.task.ExportExportingTask;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.utframe.TestWithFeService;
@@ -171,14 +172,14 @@ public class SessionVariablesTest extends 
TestWithFeService {
 
             ExportStmt exportStmt = (ExportStmt)
                     parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO 
\"file:///tmp/test_t1\"", connectContext);
-            ExportJob job = new ExportJob(1234);
-            job.setJob(exportStmt);
+            ExportJob job = exportStmt.getExportJob();
+            job.setId(1234);
 
             new Expectations(job) {
                 {
                     job.getState();
                     minTimes = 0;
-                    result = ExportJob.JobState.EXPORTING;
+                    result = ExportJobState.EXPORTING;
                 }
             };
 
@@ -201,14 +202,14 @@ public class SessionVariablesTest extends 
TestWithFeService {
 
             ExportStmt exportStmt = (ExportStmt)
                     parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO 
\"file:///tmp/test_t1\"", connectContext);
-            ExportJob job = new ExportJob(1234);
-            job.setJob(exportStmt);
+            ExportJob job = exportStmt.getExportJob();
+            job.setId(1234);
 
             new Expectations(job) {
                 {
                     job.getState();
                     minTimes = 0;
-                    result = ExportJob.JobState.EXPORTING;
+                    result = ExportJobState.EXPORTING;
                 }
             };
 
diff --git a/regression-test/suites/export_p0/test_export_basic.groovy 
b/regression-test/suites/export_p0/test_export_basic.groovy
index 95805db491..162b63065e 100644
--- a/regression-test/suites/export_p0/test_export_basic.groovy
+++ b/regression-test/suites/export_p0/test_export_basic.groovy
@@ -69,7 +69,8 @@ suite("test_export_basic", "p0") {
             PARTITION between_20_70 VALUES [("20"),("70")),
             PARTITION more_than_70 VALUES LESS THAN ("151")
         )
-        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        DISTRIBUTED BY HASH(id) BUCKETS 3
+        PROPERTIES("replication_num" = "1");
     """
     StringBuilder sb = new StringBuilder()
     int i = 1
diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy 
b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
index 205b1ffd71..3be2794cbd 100644
--- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy
+++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
@@ -37,7 +37,8 @@ suite("test_export_with_hdfs", "p2") {
             PARTITION between_20_70 VALUES [("20"),("70")),
             PARTITION more_than_70 VALUES LESS THAN ("151")
         )
-        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        DISTRIBUTED BY HASH(id) BUCKETS 3
+        PROPERTIES("replication_num" = "1");
     """
     StringBuilder sb = new StringBuilder()
     int i = 1
@@ -62,8 +63,9 @@ suite("test_export_with_hdfs", "p2") {
             if (res[0][2] == "FINISHED") {
                 def json = parseJson(res[0][11])
                 assert json instanceof List
-                assertEquals("1", json.fileNumber[0])
-                return json.url[0];
+                assertEquals("1", json.fileNumber[0][0])
+                log.info("outfile_path: ${json.url[0][0]}")
+                return json.url[0][0];
             } else if (res[0][2] == "CANCELLED") {
                 throw new IllegalStateException("""export failed: 
${res[0][10]}""")
             } else {
diff --git a/regression-test/suites/export_p2/test_export_with_s3.groovy 
b/regression-test/suites/export_p2/test_export_with_s3.groovy
index a26dde3238..82aa831cdd 100644
--- a/regression-test/suites/export_p2/test_export_with_s3.groovy
+++ b/regression-test/suites/export_p2/test_export_with_s3.groovy
@@ -38,7 +38,8 @@ suite("test_export_with_s3", "p2") {
             PARTITION between_20_70 VALUES [("20"),("70")),
             PARTITION more_than_70 VALUES LESS THAN ("151")
         )
-        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+        DISTRIBUTED BY HASH(id) BUCKETS 3
+        PROPERTIES("replication_num" = "1");
     """
     StringBuilder sb = new StringBuilder()
     int i = 1
@@ -63,9 +64,9 @@ suite("test_export_with_s3", "p2") {
             if (res[0][2] == "FINISHED") {
                 def json = parseJson(res[0][11])
                 assert json instanceof List
-                assertEquals("1", json.fileNumber[0])
-                log.info("outfile_path: ${json.url[0]}")
-                return json.url[0];
+                assertEquals("1", json.fileNumber[0][0])
+                log.info("outfile_path: ${json.url[0][0]}")
+                return json.url[0][0];
             } else if (res[0][2] == "CANCELLED") {
                 throw new IllegalStateException("""export failed: 
${res[0][10]}""")
             } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to