This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 10abbd2b62 [Feauture](Export) support parallel export job using Job Schedule (#22854) 10abbd2b62 is described below commit 10abbd2b620cf4a39df734d8471f87ea1d58f906 Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Fri Aug 18 22:24:42 2023 +0800 [Feauture](Export) support parallel export job using Job Schedule (#22854) --- .../main/java/org/apache/doris/common/Config.java | 5 + .../apache/doris/analysis/CancelExportStmt.java | 12 +- .../java/org/apache/doris/analysis/ExportStmt.java | 230 ++++--- .../org/apache/doris/analysis/OutFileClause.java | 2 +- .../org/apache/doris/analysis/ShowExportStmt.java | 8 +- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../org/apache/doris/common/proc/JobsProcDir.java | 18 +- .../org/apache/doris/journal/JournalEntity.java | 3 +- .../main/java/org/apache/doris/load/ExportJob.java | 702 +++++++++------------ .../java/org/apache/doris/load/ExportJobState.java | 46 ++ .../apache/doris/load/ExportJobStateTransfer.java | 88 +++ .../main/java/org/apache/doris/load/ExportMgr.java | 110 ++-- .../org/apache/doris/load/ExportTaskExecutor.java | 171 +++++ .../java/org/apache/doris/load/OutfileInfo.java | 37 ++ .../java/org/apache/doris/persist/EditLog.java | 8 +- .../java/org/apache/doris/qe/ShowExecutor.java | 6 +- .../java/org/apache/doris/qe/StmtExecutor.java | 3 +- .../org/apache/doris/task/ExportExportingTask.java | 24 +- .../doris/analysis/CancelExportStmtTest.java | 16 +- .../org/apache/doris/qe/SessionVariablesTest.java | 13 +- .../suites/export_p0/test_export_basic.groovy | 3 +- .../suites/export_p2/test_export_with_hdfs.groovy | 8 +- .../suites/export_p2/test_export_with_s3.groovy | 9 +- 23 files changed, 906 insertions(+), 618 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2819439c15..c2dc625829 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2102,6 +2102,11 @@ public class Config extends ConfigBase { "The maximum parallelism allowed by Export job"}) public static int maximum_parallelism_of_export_job = 50; + @ConfField(mutable = true, description = { + "ExportExecutorTask任务中一个OutFile语句允许的最大tablets数量", + "The maximum number of tablets allowed by an OutfileStatement in an ExportExecutorTask"}) + public static int maximum_tablets_of_outfile_in_export = 10; + @ConfField(mutable = true, description = { "是否用 mysql 的 bigint 类型来返回 Doris 的 largeint 类型", "Whether to use mysql's bigint type to return Doris's largeint type"}) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java index ca55761420..cdd2ecf73c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java @@ -21,8 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.load.ExportJob; -import org.apache.doris.load.ExportJob.JobState; +import org.apache.doris.load.ExportJobState; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -83,11 +82,10 @@ public class CancelExportStmt extends DdlStmt { throw new AnalysisException("Only label can use like"); } state = inputValue; - ExportJob.JobState jobState = ExportJob.JobState.valueOf(state); - if (jobState != ExportJob.JobState.PENDING - && jobState != JobState.IN_QUEUE - && jobState != ExportJob.JobState.EXPORTING) { - throw new AnalysisException("Only support PENDING/IN_QUEUE/EXPORTING, invalid state: " + state); + ExportJobState jobState = ExportJobState.valueOf(state); + if (jobState != ExportJobState.PENDING + && jobState != ExportJobState.EXPORTING) { + throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 930c3121b9..69f4d7174f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -17,13 +17,13 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeNameFormat; @@ -32,6 +32,7 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.URI; import org.apache.doris.common.util.Util; +import org.apache.doris.load.ExportJob; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -39,25 +40,26 @@ import org.apache.doris.qe.VariableMgr; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import lombok.Getter; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; // EXPORT statement, export data to dirs by broker. // // syntax: -// EXPORT TABLE tablename [PARTITION (name1[, ...])] +// EXPORT TABLE table_name [PARTITION (name1[, ...])] // TO 'export_target_path' // [PROPERTIES("key"="value")] // BY BROKER 'broker_name' [( $broker_attrs)] +@Getter public class ExportStmt extends StatementBase { - private static final Logger LOG = LogManager.getLogger(ExportStmt.class); public static final String PARALLELISM = "parallelism"; public static final String LABEL = "label"; @@ -106,6 +108,8 @@ public class ExportStmt extends StatementBase { private UserIdentity userIdentity; + private ExportJob exportJob; + public ExportStmt(TableRef tableRef, Expr whereExpr, String path, Map<String, String> properties, BrokerDesc brokerDesc) { this.tableRef = tableRef; @@ -118,75 +122,15 @@ public class ExportStmt extends StatementBase { this.columnSeparator = DEFAULT_COLUMN_SEPARATOR; this.lineDelimiter = DEFAULT_LINE_DELIMITER; this.columns = DEFAULT_COLUMNS; - if (ConnectContext.get() != null) { - this.sessionVariables = ConnectContext.get().getSessionVariable(); - } else { - this.sessionVariables = VariableMgr.getDefaultSessionVariable(); - } - } - - public String getColumns() { - return columns; - } - - public TableName getTblName() { - return tblName; - } - - public List<String> getPartitions() { - return partitionStringNames; - } - - public Expr getWhereExpr() { - return whereExpr; - } - - public String getPath() { - return path; - } - - public BrokerDesc getBrokerDesc() { - return brokerDesc; - } - - public String getColumnSeparator() { - return this.columnSeparator; - } - - public String getLineDelimiter() { - return this.lineDelimiter; - } - - public TableRef getTableRef() { - return this.tableRef; - } - - public String getFormat() { - return format; - } - - public String getLabel() { - return label; - } - - public SessionVariable getSessionVariables() { - return sessionVariables; - } - public String getQualifiedUser() { - return qualifiedUser; - } - - public UserIdentity getUserIdentity() { - return this.userIdentity; + Optional<SessionVariable> optionalSessionVariable = Optional.ofNullable( + ConnectContext.get().getSessionVariable()); + this.sessionVariables = optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable()); } @Override public boolean needAuditEncryption() { - if (brokerDesc != null) { - return true; - } - return false; + return brokerDesc != null; } @Override @@ -197,16 +141,17 @@ public class ExportStmt extends StatementBase { Preconditions.checkNotNull(tableRef); tableRef.analyze(analyzer); - this.tblName = tableRef.getName(); // disallow external catalog + tblName = tableRef.getName(); Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName()); - PartitionNames partitionNames = tableRef.getPartitionNames(); - if (partitionNames != null) { - if (partitionNames.isTemp()) { + // get partitions name + Optional<PartitionNames> optionalPartitionNames = Optional.ofNullable(tableRef.getPartitionNames()); + if (optionalPartitionNames.isPresent()) { + if (optionalPartitionNames.get().isTemp()) { throw new AnalysisException("Do not support exporting temporary partitions"); } - partitionStringNames = partitionNames.getPartitionNames(); + partitionStringNames = optionalPartitionNames.get().getPartitionNames(); } // check auth @@ -222,7 +167,7 @@ public class ExportStmt extends StatementBase { userIdentity = ConnectContext.get().getCurrentUserIdentity(); // check table && partitions whether exist - checkTable(analyzer.getEnv()); + checkPartitions(analyzer.getEnv()); // check broker whether exist if (brokerDesc == null) { @@ -232,28 +177,86 @@ public class ExportStmt extends StatementBase { // check path is valid path = checkPath(path, brokerDesc.getStorageType()); if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) { - if (!analyzer.getEnv().getBrokerMgr().containsBroker(brokerDesc.getName())) { + BrokerMgr brokerMgr = analyzer.getEnv().getBrokerMgr(); + if (!brokerMgr.containsBroker(brokerDesc.getName())) { throw new AnalysisException("broker " + brokerDesc.getName() + " does not exist"); } - - FsBroker broker = analyzer.getEnv().getBrokerMgr().getAnyBroker(brokerDesc.getName()); - if (broker == null) { + if (null == brokerMgr.getAnyBroker(brokerDesc.getName())) { throw new AnalysisException("failed to get alive broker"); } } // check properties checkProperties(properties); + + // create job and analyze job + setJob(); + exportJob.analyze(); } - private void checkTable(Env env) throws AnalysisException { + private void setJob() throws UserException { + exportJob = new ExportJob(); + + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb()); + exportJob.setDbId(db.getId()); + exportJob.setTableName(this.tblName); + exportJob.setExportTable(db.getTableOrDdlException(this.tblName.getTbl())); + exportJob.setTableId(db.getTableOrDdlException(this.tblName.getTbl()).getId()); + exportJob.setTableRef(this.tableRef); + + // set partitions + exportJob.setPartitionNames(this.partitionStringNames); + + // set where expr + exportJob.setWhereExpr(this.whereExpr); + + // set path + exportJob.setExportPath(this.path); + + // set properties + exportJob.setLabel(this.label); + exportJob.setColumnSeparator(this.columnSeparator); + exportJob.setLineDelimiter(this.lineDelimiter); + exportJob.setFormat(this.format); + exportJob.setColumns(this.columns); + exportJob.setParallelism(this.parallelism); + exportJob.setMaxFileSize(this.maxFileSize); + exportJob.setDeleteExistingFiles(this.deleteExistingFiles); + + if (!Strings.isNullOrEmpty(this.columns)) { + Splitter split = Splitter.on(',').trimResults().omitEmptyStrings(); + exportJob.setExportColumns(split.splitToList(this.columns.toLowerCase())); + } + + // set broker desc + exportJob.setBrokerDesc(this.brokerDesc); + + // set sessions + exportJob.setQualifiedUser(this.qualifiedUser); + exportJob.setUserIdentity(this.userIdentity); + exportJob.setSessionVariables(this.sessionVariables); + exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS()); + + exportJob.setSql(this.toSql()); + exportJob.setOrigStmt(this.getOrigStmt()); + } + + // check partitions specified by user are belonged to the table. + private void checkPartitions(Env env) throws AnalysisException { + if (partitionStringNames == null) { + return; + } + + if (partitionStringNames.size() > Config.maximum_number_of_export_partitions) { + throw new AnalysisException("The partitions number of this export job is larger than the maximum number" + + " of partitions allowed by an export job"); + } + Database db = env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb()); Table table = db.getTableOrAnalysisException(tblName.getTbl()); table.readLock(); try { - if (partitionStringNames == null) { - return; - } + // check table if (!table.isPartitioned()) { throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned."); } @@ -270,13 +273,14 @@ public class ExportStmt extends StatementBase { case VIEW: default: throw new AnalysisException("Table[" + tblName.getTbl() + "] is " - + tblType.toString() + " type, do not support EXPORT."); + + tblType + " type, do not support EXPORT."); } for (String partitionName : partitionStringNames) { Partition partition = table.getPartition(partitionName); if (partition == null) { - throw new AnalysisException("Partition [" + partitionName + "] does not exist"); + throw new AnalysisException("Partition [" + partitionName + "] does not exist " + + "in Table[" + tblName.getTbl() + "]"); } } } finally { @@ -286,13 +290,17 @@ public class ExportStmt extends StatementBase { public static String checkPath(String path, StorageBackend.StorageType type) throws AnalysisException { if (Strings.isNullOrEmpty(path)) { - throw new AnalysisException("No dest path specified."); + throw new AnalysisException("No destination path specified."); } URI uri = URI.create(path); String schema = uri.getScheme(); + if (schema == null) { + throw new AnalysisException( + "Invalid export path, there is no schema of URI found. please check your path."); + } if (type == StorageBackend.StorageType.BROKER) { - if (schema == null || (!schema.equalsIgnoreCase("bos") + if (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs") && !schema.equalsIgnoreCase("hdfs") && !schema.equalsIgnoreCase("ofs") @@ -302,23 +310,17 @@ public class ExportStmt extends StatementBase { && !schema.equalsIgnoreCase("cosn") && !schema.equalsIgnoreCase("gfs") && !schema.equalsIgnoreCase("jfs") - && !schema.equalsIgnoreCase("gs"))) { + && !schema.equalsIgnoreCase("gs")) { throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://'," + " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://' or 'jfs://' path."); } - } else if (type == StorageBackend.StorageType.S3) { - if (schema == null || !schema.equalsIgnoreCase("s3")) { - throw new AnalysisException("Invalid export path. please use valid 's3://' path."); - } - } else if (type == StorageBackend.StorageType.HDFS) { - if (schema == null || !schema.equalsIgnoreCase("hdfs")) { - throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path."); - } - } else if (type == StorageBackend.StorageType.LOCAL) { - if (schema != null && !schema.equalsIgnoreCase("file")) { - throw new AnalysisException( + } else if (type == StorageBackend.StorageType.S3 && !schema.equalsIgnoreCase("s3")) { + throw new AnalysisException("Invalid export path. please use valid 's3://' path."); + } else if (type == StorageBackend.StorageType.HDFS && !schema.equalsIgnoreCase("hdfs")) { + throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path."); + } else if (type == StorageBackend.StorageType.LOCAL && !schema.equalsIgnoreCase("file")) { + throw new AnalysisException( "Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path."); - } } return path; } @@ -326,7 +328,7 @@ public class ExportStmt extends StatementBase { private void checkProperties(Map<String, String> properties) throws UserException { for (String key : properties.keySet()) { if (!PROPERTIES_SET.contains(key.toLowerCase())) { - throw new DdlException("Invalid property key: '" + key + "'"); + throw new UserException("Invalid property key: [" + key + "]"); } } @@ -348,20 +350,24 @@ public class ExportStmt extends StatementBase { // parallelism String parallelismString = properties.getOrDefault(PARALLELISM, DEFAULT_PARALLELISM); - parallelism = Integer.parseInt(parallelismString); + try { + this.parallelism = Integer.parseInt(parallelismString); + } catch (NumberFormatException e) { + throw new UserException("The value of parallelism is invalid!"); + } // max_file_size this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""); this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, ""); + // label if (properties.containsKey(LABEL)) { FeNameFormat.checkLabel(properties.get(LABEL)); + this.label = properties.get(LABEL); } else { // generate a random label - String label = "export_" + UUID.randomUUID(); - properties.put(LABEL, label); + this.label = "export_" + UUID.randomUUID(); } - label = properties.get(LABEL); } @Override @@ -408,16 +414,4 @@ public class ExportStmt extends StatementBase { public String toString() { return toSql(); } - - public String getMaxFileSize() { - return maxFileSize; - } - - public String getDeleteExistingFiles() { - return deleteExistingFiles; - } - - public Integer getParallelNum() { - return parallelism; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 24ed977a7d..d2f760d5b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -255,7 +255,7 @@ public class OutFileClause { if (brokerDesc != null && isLocalOutput) { throw new AnalysisException("No need to specify BROKER properties in OUTFILE clause for local file output"); } else if (brokerDesc == null && !isLocalOutput) { - throw new AnalysisException("Must specify BROKER properties in OUTFILE clause"); + throw new AnalysisException("Must specify BROKER properties or current local file path in OUTFILE clause"); } isAnalyzed = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java index d826da8340..eb2b548b92 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java @@ -27,7 +27,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.proc.ExportProcNode; import org.apache.doris.common.util.OrderByPair; -import org.apache.doris.load.ExportJob.JobState; +import org.apache.doris.load.ExportJobState; import org.apache.doris.qe.ShowResultSetMetaData; import com.google.common.base.Strings; @@ -54,7 +54,7 @@ public class ShowExportStmt extends ShowStmt { private boolean isLabelUseLike = false; private String stateValue = null; - private JobState jobState; + private ExportJobState jobState; private ArrayList<OrderByPair> orderByPairs; @@ -84,7 +84,7 @@ public class ShowExportStmt extends ShowStmt { return this.jobId; } - public JobState getJobState() { + public ExportJobState getJobState() { if (Strings.isNullOrEmpty(stateValue)) { return null; } @@ -152,7 +152,7 @@ public class ShowExportStmt extends ShowStmt { if (!Strings.isNullOrEmpty(value)) { stateValue = value.toUpperCase(); try { - jobState = JobState.valueOf(stateValue); + jobState = ExportJobState.valueOf(stateValue); valid = true; } catch (IllegalArgumentException e) { LOG.warn("illegal state argument in export stmt. stateValue={}, error={}", stateValue, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 0611016964..041db9b430 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3755,7 +3755,7 @@ public class Env { return timerJobManager; } - public TransientTaskManager getMemoryTaskManager() { + public TransientTaskManager getTransientTaskManager() { return transientTaskManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java index b81561399d..781ee38d52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java @@ -22,7 +22,7 @@ import org.apache.doris.alter.SchemaChangeHandler; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; -import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.loadv2.LoadManager; @@ -147,10 +147,10 @@ public class JobsProcDir implements ProcDirInterface { // export ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); - pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING, dbId); - runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING, dbId); - finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED, dbId); - cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED, dbId); + pendingNum = exportMgr.getJobNum(ExportJobState.PENDING, dbId); + runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING, dbId); + finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED, dbId); + cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED, dbId); totalNum = pendingNum + runningNum + finishedNum + cancelledNum; result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); @@ -209,10 +209,10 @@ public class JobsProcDir implements ProcDirInterface { // export ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); - pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING); - runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING); - finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED); - cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED); + pendingNum = exportMgr.getJobNum(ExportJobState.PENDING); + runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING); + finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED); + cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED); totalNum = pendingNum + runningNum + finishedNum + cancelledNum; result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(), cancelledNum.toString(), totalNum.toString())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index e155751d5b..d8b8c62bd5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -47,6 +47,7 @@ import org.apache.doris.ha.MasterInfo; import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteInfo; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobStateTransfer; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord; @@ -319,7 +320,7 @@ public class JournalEntity implements Writable { isRead = true; break; case OperationType.OP_EXPORT_UPDATE_STATE: - data = ExportJob.StateTransfer.read(in); + data = ExportJobStateTransfer.read(in); isRead = true; break; case OperationType.OP_FINISH_DELETE: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 1527ba1ce4..bd975bb6fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -48,17 +48,17 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.registry.ExportTaskRegister; +import org.apache.doris.scheduler.registry.TransientTaskRegister; import org.apache.doris.task.ExportExportingTask; import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; @@ -66,7 +66,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; -import lombok.Getter; +import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,27 +81,21 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; -// NOTE: we must be carefully if we send next request -// as soon as receiving one instance's report from one BE, -// because we may change job's member concurrently. +@Data public class ExportJob implements Writable { private static final Logger LOG = LogManager.getLogger(ExportJob.class); private static final String BROKER_PROPERTY_PREFIXES = "broker."; - public enum JobState { - PENDING, - IN_QUEUE, - EXPORTING, - FINISHED, - CANCELLED, - } + private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = Config.maximum_tablets_of_outfile_in_export; + + public static final TransientTaskRegister register = new ExportTaskRegister( + Env.getCurrentEnv().getTransientTaskManager()); @SerializedName("id") private long id; - @SerializedName("queryId") - private String queryId; @SerializedName("label") private String label; @SerializedName("dbId") @@ -121,7 +115,7 @@ public class ExportJob implements Writable { @SerializedName("tableName") private TableName tableName; @SerializedName("state") - private JobState state; + private ExportJobState state; @SerializedName("createTimeMs") private long createTimeMs; // this is the origin stmt of ExportStmt, we use it to persist where expr of Export job, @@ -153,15 +147,19 @@ public class ExportJob implements Writable { // progress has two functions at EXPORTING stage: // 1. when progress < 100, it indicates exporting // 2. set progress = 100 ONLY when exporting progress is completely done + @SerializedName("progress") private int progress; + @SerializedName("tabletsNum") + private Integer tabletsNum; + private TableRef tableRef; private Expr whereExpr; private String sql = ""; - private Integer parallelNum; + private Integer parallelism; public Map<String, Long> getPartitionToVersion() { return partitionToVersion; @@ -170,9 +168,11 @@ public class ExportJob implements Writable { private Map<String, Long> partitionToVersion = Maps.newHashMap(); // The selectStmt is sql 'select ... into outfile ...' - @Getter + // TODO(ftw): delete private List<SelectStmt> selectStmtList = Lists.newArrayList(); + private List<List<SelectStmt>> selectStmtListPerParallel = Lists.newArrayList(); + private List<StmtExecutor> stmtExecutorList; private List<String> exportColumns = Lists.newArrayList(); @@ -188,16 +188,21 @@ public class ExportJob implements Writable { private ExportExportingTask task; - private List<TScanRangeLocations> tabletLocations = Lists.newArrayList(); // backend_address => snapshot path private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList(); + private List<ExportTaskExecutor> jobExecutorList; + + private ConcurrentHashMap<Long, ExportTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>(); + + private Integer finishedTaskCount = 0; + private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList(); + public ExportJob() { this.id = -1; - this.queryId = ""; this.dbId = -1; this.tableId = -1; - this.state = JobState.PENDING; + this.state = ExportJobState.PENDING; this.progress = 0; this.createTimeMs = System.currentTimeMillis(); this.startTimeMs = -1; @@ -215,55 +220,37 @@ public class ExportJob implements Writable { this.id = jobId; } - public void setJob(ExportStmt stmt) throws UserException { - String dbName = stmt.getTblName().getDb(); - Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName); - Preconditions.checkNotNull(stmt.getBrokerDesc()); - this.brokerDesc = stmt.getBrokerDesc(); - this.columnSeparator = stmt.getColumnSeparator(); - this.lineDelimiter = stmt.getLineDelimiter(); - this.label = stmt.getLabel(); - this.queryId = ConnectContext.get() != null ? DebugUtil.printId(ConnectContext.get().queryId()) : "N/A"; - String path = stmt.getPath(); - Preconditions.checkArgument(!Strings.isNullOrEmpty(path)); - this.whereExpr = stmt.getWhereExpr(); - this.parallelNum = stmt.getParallelNum(); - this.exportPath = path; - this.sessionVariables = stmt.getSessionVariables(); - this.timeoutSecond = sessionVariables.getQueryTimeoutS(); - - this.qualifiedUser = stmt.getQualifiedUser(); - this.userIdentity = stmt.getUserIdentity(); - this.format = stmt.getFormat(); - this.maxFileSize = stmt.getMaxFileSize(); - this.deleteExistingFiles = stmt.getDeleteExistingFiles(); - this.partitionNames = stmt.getPartitions(); - - this.exportTable = db.getTableOrDdlException(stmt.getTblName().getTbl()); - this.columns = stmt.getColumns(); - this.tableRef = stmt.getTableRef(); - if (!Strings.isNullOrEmpty(this.columns)) { - Splitter split = Splitter.on(',').trimResults().omitEmptyStrings(); - this.exportColumns = split.splitToList(stmt.getColumns().toLowerCase()); - } + /** + * For an ExportJob: + * The ExportJob is divided into multiple 'ExportTaskExecutor' + * according to the 'parallelism' set by the user. + * The tablets which will be exported by this ExportJob are divided into 'parallelism' copies, + * and each ExportTaskExecutor is responsible for a list of tablets. + * The tablets responsible for an ExportTaskExecutor will be assigned to multiple OutfileStmt + * according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'. + * + * @throws UserException + */ + public void analyze() throws UserException { exportTable.readLock(); try { - this.dbId = db.getId(); - this.tableId = exportTable.getId(); - this.tableName = stmt.getTblName(); - if (selectStmtList.isEmpty()) { - // This scenario is used for 'EXPORT TABLE tbl INTO PATH' - // we need generate Select Statement - generateQueryStmt(stmt); - } + // generateQueryStmtOld + generateQueryStmt(); } finally { exportTable.readUnlock(); } - this.sql = stmt.toSql(); - this.origStmt = stmt.getOrigStmt(); + generateExportJobExecutor(); + } + + public void generateExportJobExecutor() { + jobExecutorList = Lists.newArrayList(); + for (List<SelectStmt> selectStmts : selectStmtListPerParallel) { + ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, this); + jobExecutorList.add(executor); + } } - private void generateQueryStmt(ExportStmt stmt) throws UserException { + private void generateQueryStmtOld() throws UserException { SelectList list = new SelectList(); if (exportColumns.isEmpty()) { list.addItem(SelectListItem.createStarItem(this.tableName)); @@ -278,7 +265,16 @@ public class ExportJob implements Writable { } } - ArrayList<ArrayList<TableRef>> tableRefListPerQuery = splitTablets(stmt); + ArrayList<ArrayList<Long>> tabletsListPerQuery = splitTablets(); + + ArrayList<ArrayList<TableRef>> tableRefListPerQuery = Lists.newArrayList(); + for (ArrayList<Long> tabletsList : tabletsListPerQuery) { + TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null, tabletsList, + this.tableRef.getTableSample(), this.tableRef.getCommonHints()); + ArrayList<TableRef> tableRefList = Lists.newArrayList(); + tableRefList.add(tblRef); + tableRefListPerQuery.add(tableRefList); + } LOG.info("Export task is split into {} outfile statements.", tableRefListPerQuery.size()); if (LOG.isDebugEnabled()) { @@ -306,30 +302,104 @@ public class ExportJob implements Writable { } } - private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) throws UserException { + /** + * Generate outfile select stmt + * @throws UserException + */ + private void generateQueryStmt() throws UserException { + SelectList list = new SelectList(); + if (exportColumns.isEmpty()) { + list.addItem(SelectListItem.createStarItem(this.tableName)); + } else { + for (Column column : exportTable.getBaseSchema()) { + String colName = column.getName().toLowerCase(); + if (exportColumns.contains(colName)) { + SlotRef slotRef = new SlotRef(this.tableName, colName); + SelectListItem selectListItem = new SelectListItem(slotRef, null); + list.addItem(selectListItem); + } + } + } + + ArrayList<ArrayList<TableRef>> tableRefListPerParallel = getTableRefListPerParallel(); + LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, tableRefListPerParallel.size()); + + // debug LOG output + if (LOG.isDebugEnabled()) { + for (int i = 0; i < tableRefListPerParallel.size(); i++) { + LOG.debug("ExportTaskExecutor {} is responsible for tablets:", i); + for (TableRef tableRef : tableRefListPerParallel.get(i)) { + LOG.debug("Tablet id: [{}]", tableRef.getSampleTabletIds()); + } + } + } + + // generate 'select..outfile..' statement + for (ArrayList<TableRef> tableRefList : tableRefListPerParallel) { + List<SelectStmt> selectStmtLists = Lists.newArrayList(); + for (TableRef tableRef : tableRefList) { + ArrayList<TableRef> tmpTableRefList = Lists.newArrayList(tableRef); + FromClause fromClause = new FromClause(tmpTableRefList); + // generate outfile clause + OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); + SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, + null, null, LimitElement.NO_LIMIT); + selectStmt.setOutFileClause(outfile); + selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); + selectStmtLists.add(selectStmt); + } + selectStmtListPerParallel.add(selectStmtLists); + } + + // debug LOG output + if (LOG.isDebugEnabled()) { + for (int i = 0; i < selectStmtListPerParallel.size(); ++i) { + LOG.debug("ExportTaskExecutor {} is responsible for outfile:", i); + for (SelectStmt outfile : selectStmtListPerParallel.get(i)) { + LOG.debug("outfile sql: [{}]", outfile.toSql()); + } + } + } + } + + private ArrayList<ArrayList<TableRef>> getTableRefListPerParallel() throws UserException { + ArrayList<ArrayList<Long>> tabletsListPerParallel = splitTablets(); + + ArrayList<ArrayList<TableRef>> tableRefListPerParallel = Lists.newArrayList(); + for (ArrayList<Long> tabletsList : tabletsListPerParallel) { + ArrayList<TableRef> tableRefList = Lists.newArrayList(); + for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) { + int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size() + ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size(); + ArrayList<Long> tablets = new ArrayList<>(tabletsList.subList(i, end)); + TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), + this.tableRef.getPartitionNames(), tablets, + this.tableRef.getTableSample(), this.tableRef.getCommonHints()); + tableRefList.add(tblRef); + } + tableRefListPerParallel.add(tableRefList); + } + return tableRefListPerParallel; + } + + private ArrayList<ArrayList<Long>> splitTablets() throws UserException { // get tablets - Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb()); - OlapTable table = db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl()); + Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb()); + OlapTable table = db.getOlapTableOrAnalysisException(this.tableName.getTbl()); List<Long> tabletIdList = Lists.newArrayList(); table.readLock(); try { - Collection<Partition> partitions = new ArrayList<Partition>(); + final Collection<Partition> partitions = new ArrayList<Partition>(); // get partitions // user specifies partitions, already checked in ExportStmt if (this.partitionNames != null) { - if (partitionNames.size() > Config.maximum_number_of_export_partitions) { - throw new UserException("The partitions number of this export job is larger than the maximum number" - + " of partitions allowed by a export job"); - } - for (String partName : this.partitionNames) { - partitions.add(table.getPartition(partName)); - } + this.partitionNames.forEach(partitionName -> partitions.add(table.getPartition(partitionName))); } else { if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) { throw new UserException("The partitions number of this export job is larger than the maximum number" + " of partitions allowed by a export job"); } - partitions = table.getPartitions(); + partitions.addAll(table.getPartitions()); } // get tablets @@ -344,38 +414,34 @@ public class ExportJob implements Writable { } Integer tabletsAllNum = tabletIdList.size(); - Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum; - Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerQuery * this.parallelNum; - - Integer start = 0; - - ArrayList<ArrayList<TableRef>> tableRefListPerQuery = Lists.newArrayList(); - - int outfileNum = this.parallelNum; - if (tabletsAllNum < this.parallelNum) { - outfileNum = tabletsAllNum; + tabletsNum = tabletsAllNum; + Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism; + Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerParallel * this.parallelism; + + ArrayList<ArrayList<Long>> tabletsListPerParallel = Lists.newArrayList(); + Integer realParallelism = this.parallelism; + if (tabletsAllNum < this.parallelism) { + realParallelism = tabletsAllNum; LOG.warn("Export Job [{}]: The number of tablets ({}) is smaller than parallelism ({}), " - + "set parallelism to tablets num.", id, tabletsAllNum, this.parallelNum); + + "set parallelism to tablets num.", id, tabletsAllNum, this.parallelism); } - for (int i = 0; i < outfileNum; ++i) { - Integer tabletsNum = tabletsNumPerQuery; + Integer start = 0; + for (int i = 0; i < realParallelism; ++i) { + Integer tabletsNum = tabletsNumPerParallel; if (tabletsNumPerQueryRemainder > 0) { tabletsNum = tabletsNum + 1; --tabletsNumPerQueryRemainder; } ArrayList<Long> tablets = new ArrayList<>(tabletIdList.subList(start, start + tabletsNum)); start += tabletsNum; - TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null, tablets, - this.tableRef.getTableSample(), this.tableRef.getCommonHints()); - ArrayList<TableRef> tableRefList = Lists.newArrayList(); - tableRefList.add(tblRef); - tableRefListPerQuery.add(tableRefList); + + tabletsListPerParallel.add(tablets); } - return tableRefListPerQuery; + return tabletsListPerParallel; } private Map<String, String> convertOutfileProperties() { - Map<String, String> outfileProperties = Maps.newHashMap(); + final Map<String, String> outfileProperties = Maps.newHashMap(); // file properties if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) { @@ -393,9 +459,7 @@ public class ExportJob implements Writable { // outfile clause's broker properties need 'broker.' prefix if (brokerDesc.getStorageType() == StorageType.BROKER) { outfileProperties.put(BROKER_PROPERTY_PREFIXES + "name", brokerDesc.getName()); - for (Entry<String, String> kv : brokerDesc.getProperties().entrySet()) { - outfileProperties.put(BROKER_PROPERTY_PREFIXES + kv.getKey(), kv.getValue()); - } + brokerDesc.getProperties().forEach((k, v) -> outfileProperties.put(BROKER_PROPERTY_PREFIXES + k, v)); } else { for (Entry<String, String> kv : brokerDesc.getProperties().entrySet()) { outfileProperties.put(kv.getKey(), kv.getValue()); @@ -404,131 +468,25 @@ public class ExportJob implements Writable { return outfileProperties; } - public String getColumns() { - return columns; - } - - public long getId() { - return id; - } - - public long getDbId() { - return dbId; - } - - public long getTableId() { - return this.tableId; - } - - public Expr getWhereExpr() { - return whereExpr; - } - - public synchronized JobState getState() { + public synchronized ExportJobState getState() { return state; } - public BrokerDesc getBrokerDesc() { - return brokerDesc; - } - - public void setBrokerDesc(BrokerDesc brokerDesc) { - this.brokerDesc = brokerDesc; - } - - public String getExportPath() { - return exportPath; - } - - public String getColumnSeparator() { - return this.columnSeparator; - } - - public String getLineDelimiter() { - return this.lineDelimiter; - } - - public int getTimeoutSecond() { - return timeoutSecond; - } - - public String getFormat() { - return format; - } - - public String getMaxFileSize() { - return maxFileSize; - } - - public String getDeleteExistingFiles() { - return deleteExistingFiles; - } - - public String getQualifiedUser() { - return qualifiedUser; - } - - public UserIdentity getUserIdentity() { - return userIdentity; - } - - public List<String> getPartitions() { - return partitionNames; - } - - public int getProgress() { - return progress; - } - - public void setProgress(int progress) { - this.progress = progress; - } - - public long getCreateTimeMs() { - return createTimeMs; - } - - public long getStartTimeMs() { - return startTimeMs; - } - - public void setStartTimeMs(long startTimeMs) { - this.startTimeMs = startTimeMs; - } - - public long getFinishTimeMs() { - return finishTimeMs; - } - - public void setFinishTimeMs(long finishTimeMs) { - this.finishTimeMs = finishTimeMs; - } - - public ExportFailMsg getFailMsg() { - return failMsg; - } - - public void setFailMsg(ExportFailMsg failMsg) { - this.failMsg = failMsg; - } - - public String getOutfileInfo() { - return outfileInfo; - } - - public void setOutfileInfo(String outfileInfo) { - this.outfileInfo = outfileInfo; + private void setExportJobState(ExportJobState newState) { + this.state = newState; } - + // TODO(ftw): delete public synchronized Thread getDoExportingThread() { return doExportingThread; } + // TODO(ftw): delete public synchronized void setDoExportingThread(Thread isExportingThread) { this.doExportingThread = isExportingThread; } + // TODO(ftw): delete public synchronized void setStmtExecutor(int idx, StmtExecutor executor) { this.stmtExecutorList.set(idx, executor); } @@ -537,51 +495,20 @@ public class ExportJob implements Writable { return this.stmtExecutorList.get(idx); } - public List<TScanRangeLocations> getTabletLocations() { - return tabletLocations; - } - - public List<Pair<TNetworkAddress, String>> getSnapshotPaths() { - return this.snapshotPaths; - } - - public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) { - this.snapshotPaths.add(snapshotPath); - } - - public String getSql() { - return sql; - } - - public ExportExportingTask getTask() { - return task; - } - - public void setTask(ExportExportingTask task) { - this.task = task; - } - - public TableName getTableName() { - return tableName; - } - - public SessionVariable getSessionVariables() { - return sessionVariables; - } - + // TODO(ftw): delete public synchronized void cancel(ExportFailMsg.CancelType type, String msg) { if (msg != null) { failMsg = new ExportFailMsg(type, msg); } // maybe user cancel this job - if (task != null && state == JobState.EXPORTING && stmtExecutorList != null) { + if (task != null && state == ExportJobState.EXPORTING && stmtExecutorList != null) { for (int idx = 0; idx < stmtExecutorList.size(); ++idx) { stmtExecutorList.get(idx).cancel(); } } - if (updateState(ExportJob.JobState.CANCELLED, false)) { + if (updateState(ExportJobState.CANCELLED, false)) { // release snapshot // Status releaseSnapshotStatus = releaseSnapshotPaths(); // if (!releaseSnapshotStatus.ok()) { @@ -592,23 +519,150 @@ public class ExportJob implements Writable { } } + public synchronized void updateExportJobState(ExportJobState newState, Long taskId, + List<OutfileInfo> outfileInfoList, ExportFailMsg.CancelType type, String msg) throws JobException { + switch (newState) { + case PENDING: + throw new JobException("Can not update ExportJob state to 'PENDING', job id: [{}], task id: [{}]", + id, taskId); + case EXPORTING: + exportExportJob(); + break; + case CANCELLED: + cancelExportTask(type, msg); + break; + case FINISHED: + finishExportTask(taskId, outfileInfoList); + break; + default: + return; + } + } + + public void cancelReplayedExportJob(ExportFailMsg.CancelType type, String msg) { + setExportJobState(ExportJobState.CANCELLED); + failMsg = new ExportFailMsg(type, msg); + } + + private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws JobException { + if (getState() == ExportJobState.CANCELLED) { + return; + } + + if (getState() == ExportJobState.FINISHED) { + throw new JobException("Job {} has finished, can not been cancelled", id); + } + + if (getState() == ExportJobState.PENDING) { + startTimeMs = System.currentTimeMillis(); + } + + // we need cancel all task + taskIdToExecutor.keySet().forEach(id -> { + try { + register.cancelTask(id); + } catch (JobException e) { + LOG.warn("cancel export task {} exception: {}", id, e); + } + }); + + cancelExportJobUnprotected(type, msg); + } + + private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String msg) { + setExportJobState(ExportJobState.CANCELLED); + finishTimeMs = System.currentTimeMillis(); + failMsg = new ExportFailMsg(type, msg); + Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED); + } + + // TODO(ftw): delete public synchronized boolean finish(List<OutfileInfo> outfileInfoList) { outfileInfo = GsonUtils.GSON.toJson(outfileInfoList); - if (updateState(ExportJob.JobState.FINISHED)) { + if (updateState(ExportJobState.FINISHED)) { return true; } return false; } - public synchronized boolean updateState(ExportJob.JobState newState) { + private void exportExportJob() { + // The first exportTaskExecutor will set state to EXPORTING, + // other exportTaskExecutors do not need to set up state. + if (getState() == ExportJobState.EXPORTING) { + return; + } + setExportJobState(ExportJobState.EXPORTING); + // if isReplay == true, startTimeMs will be read from LOG + startTimeMs = System.currentTimeMillis(); + } + + private void finishExportTask(Long taskId, List<OutfileInfo> outfileInfoList) throws JobException { + if (getState() == ExportJobState.CANCELLED) { + throw new JobException("Job [{}] has been cancelled, can not finish this task: {}", id, taskId); + } + + allOutfileInfo.add(outfileInfoList); + ++finishedTaskCount; + + // calculate progress + int tmpProgress = finishedTaskCount * 100 / jobExecutorList.size(); + if (finishedTaskCount * 100 / jobExecutorList.size() >= 100) { + progress = 99; + } else { + progress = tmpProgress; + } + + // if all task finished + if (finishedTaskCount == jobExecutorList.size()) { + finishExportJobUnprotected(); + } + } + + private void finishExportJobUnprotected() { + progress = 100; + setExportJobState(ExportJobState.FINISHED); + finishTimeMs = System.currentTimeMillis(); + outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo); + Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.FINISHED); + } + + public void replayExportJobState(ExportJobState newState) { + switch (newState) { + // We do not persist EXPORTING state in new version of metadata, + // but EXPORTING state may still exist in older versions of metadata. + // So if isReplay == true and newState == EXPORTING, we set newState = CANCELLED. + case EXPORTING: + // We do not need IN_QUEUE state in new version of export + // but IN_QUEUE state may still exist in older versions of metadata. + // So if isReplay == true and newState == IN_QUEUE, we set newState = CANCELLED. + case IN_QUEUE: + newState = ExportJobState.CANCELLED; + break; + case PENDING: + case CANCELLED: + progress = 0; + break; + case FINISHED: + progress = 100; + break; + default: + Preconditions.checkState(false, "wrong job state: " + newState.name()); + break; + } + setExportJobState(newState); + } + + // TODO(ftw): delete + public synchronized boolean updateState(ExportJobState newState) { return this.updateState(newState, false); } - public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) { + // TODO(ftw): delete + public synchronized boolean updateState(ExportJobState newState, boolean isReplay) { // We do not persist EXPORTING state in new version of metadata, // but EXPORTING state may still exist in older versions of metadata. // So if isReplay == true and newState == EXPORTING, we just ignore this update. - if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) { + if (isFinalState() || (isReplay && newState == ExportJobState.EXPORTING)) { return false; } state = newState; @@ -618,7 +672,7 @@ public class ExportJob implements Writable { progress = 0; break; case EXPORTING: - // if isReplay == true, startTimeMs will be read from log + // if isReplay == true, startTimeMs will be read from LOG if (!isReplay) { startTimeMs = System.currentTimeMillis(); } @@ -630,7 +684,7 @@ public class ExportJob implements Writable { progress = 100; break; case CANCELLED: - // if isReplay == true, finishTimeMs will be read from log + // if isReplay == true, finishTimeMs will be read from LOG if (!isReplay) { finishTimeMs = System.currentTimeMillis(); } @@ -640,27 +694,19 @@ public class ExportJob implements Writable { break; } // we only persist Pending/Cancel/Finish state - if (!isReplay && newState != JobState.IN_QUEUE && newState != JobState.EXPORTING) { + if (!isReplay && newState != ExportJobState.IN_QUEUE && newState != ExportJobState.EXPORTING) { Env.getCurrentEnv().getEditLog().logExportUpdateState(id, newState); } return true; } public synchronized boolean isFinalState() { - return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED; + return this.state == ExportJobState.CANCELLED || this.state == ExportJobState.FINISHED; } public boolean isExpired(long curTime) { return (curTime - createTimeMs) / 1000 > Config.history_job_keep_max_second - && (state == ExportJob.JobState.CANCELLED || state == ExportJob.JobState.FINISHED); - } - - public String getLabel() { - return label; - } - - public String getQueryId() { - return queryId; + && (state == ExportJobState.CANCELLED || state == ExportJobState.FINISHED); } @Override @@ -737,7 +783,7 @@ public class ExportJob implements Writable { } } - state = JobState.valueOf(Text.readString(in)); + state = ExportJobState.valueOf(Text.readString(in)); createTimeMs = in.readLong(); startTimeMs = in.readLong(); finishTimeMs = in.readLong(); @@ -793,132 +839,4 @@ public class ExportJob implements Writable { return false; } - - public boolean isReplayed() { - return isReplayed; - } - - // for only persist op when switching job state. - public static class StateTransfer implements Writable { - @SerializedName("jobId") - long jobId; - @SerializedName("state") - JobState state; - @SerializedName("startTimeMs") - private long startTimeMs; - @SerializedName("finishTimeMs") - private long finishTimeMs; - @SerializedName("failMsg") - private ExportFailMsg failMsg; - @SerializedName("outFileInfo") - private String outFileInfo; - - // used for reading from one log - public StateTransfer() { - this.jobId = -1; - this.state = JobState.CANCELLED; - this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, ""); - this.outFileInfo = ""; - } - - // used for persisting one log - public StateTransfer(long jobId, JobState state) { - this.jobId = jobId; - this.state = state; - ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId); - this.startTimeMs = job.getStartTimeMs(); - this.finishTimeMs = job.getFinishTimeMs(); - this.failMsg = job.getFailMsg(); - this.outFileInfo = job.getOutfileInfo(); - } - - public long getJobId() { - return jobId; - } - - public JobState getState() { - return state; - } - - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); - } - - public static StateTransfer read(DataInput in) throws IOException { - if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) { - StateTransfer transfer = new StateTransfer(); - transfer.readFields(in); - return transfer; - } - String json = Text.readString(in); - StateTransfer transfer = GsonUtils.GSON.fromJson(json, ExportJob.StateTransfer.class); - return transfer; - } - - private void readFields(DataInput in) throws IOException { - jobId = in.readLong(); - state = JobState.valueOf(Text.readString(in)); - } - - public long getStartTimeMs() { - return startTimeMs; - } - - public long getFinishTimeMs() { - return finishTimeMs; - } - - public String getOutFileInfo() { - return outFileInfo; - } - - public ExportFailMsg getFailMsg() { - return failMsg; - } - } - - public static class OutfileInfo { - @SerializedName("fileNumber") - private String fileNumber; - @SerializedName("totalRows") - private String totalRows; - @SerializedName("fileSize") - private String fileSize; - @SerializedName("url") - private String url; - - public String getUrl() { - return url; - } - - public void setUrl(String url) { - this.url = url; - } - - public String getFileNumber() { - return fileNumber; - } - - public void setFileNumber(String fileNumber) { - this.fileNumber = fileNumber; - } - - public String getTotalRows() { - return totalRows; - } - - public void setTotalRows(String totalRows) { - this.totalRows = totalRows; - } - - public String getFileSize() { - return fileSize; - } - - public void setFileSize(String fileSize) { - this.fileSize = fileSize; - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java new file mode 100644 index 0000000000..4fd4cabf80 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +public enum ExportJobState { + + /** + * the initial state of export job. + */ + PENDING, + + /** + * When the export job is waiting to be schedule. + */ + IN_QUEUE, + + /** + * When the export job is exporting, the EXPORTING state will be triggered. + */ + EXPORTING, + + /** + * When the export job is finished, the FINISHED state will be triggered. + */ + FINISHED, + + /** + * When the export job is cancelled, the CANCELLED state will be triggered. + */ + CANCELLED, +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java new file mode 100644 index 0000000000..06253b1f1e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; +import lombok.Getter; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +@Getter +public class ExportJobStateTransfer implements Writable { + @SerializedName("jobId") + long jobId; + @SerializedName("state") + private ExportJobState state; + @SerializedName("startTimeMs") + private long startTimeMs; + @SerializedName("finishTimeMs") + private long finishTimeMs; + @SerializedName("failMsg") + private ExportFailMsg failMsg; + @SerializedName("outFileInfo") + private String outFileInfo; + + // used for reading from one log + public ExportJobStateTransfer() { + this.jobId = -1; + this.state = ExportJobState.CANCELLED; + this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, ""); + this.outFileInfo = ""; + } + + // used for persisting one log + public ExportJobStateTransfer(long jobId, ExportJobState state) { + this.jobId = jobId; + this.state = state; + ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId); + this.startTimeMs = job.getStartTimeMs(); + this.finishTimeMs = job.getFinishTimeMs(); + this.failMsg = job.getFailMsg(); + this.outFileInfo = job.getOutfileInfo(); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static ExportJobStateTransfer read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) { + ExportJobStateTransfer transfer = new ExportJobStateTransfer(); + transfer.readFields(in); + return transfer; + } + String json = Text.readString(in); + ExportJobStateTransfer transfer = GsonUtils.GSON.fromJson(json, ExportJobStateTransfer.class); + return transfer; + } + + private void readFields(DataInput in) throws IOException { + jobId = in.readLong(); + state = ExportJobState.valueOf(Text.readString(in)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 0e0b26f273..9a0bc7953f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -35,9 +35,9 @@ import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.ExportJob.JobState; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.scheduler.exception.JobException; import org.apache.doris.task.ExportExportingTask; import org.apache.doris.task.MasterTask; import org.apache.doris.task.MasterTaskExecutor; @@ -69,8 +69,8 @@ public class ExportMgr extends MasterDaemon { // lock is private and must use after db lock private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId to exportJob - private Map<String, Long> labelToJobId = Maps.newHashMap(); + private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob + private Map<String, Long> labelToExportJobId = Maps.newHashMap(); private MasterTaskExecutor exportingExecutor; @@ -103,7 +103,7 @@ public class ExportMgr extends MasterDaemon { @Override protected void runAfterCatalogReady() { - List<ExportJob> pendingJobs = getExportJobs(JobState.PENDING); + List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING); List<ExportJob> newInQueueJobs = Lists.newArrayList(); for (ExportJob job : pendingJobs) { if (handlePendingJobs(job)) { @@ -128,7 +128,7 @@ public class ExportMgr extends MasterDaemon { private boolean handlePendingJobs(ExportJob job) { // because maybe this job has been cancelled by user. - if (job.getState() != JobState.PENDING) { + if (job.getState() != ExportJobState.PENDING) { return false; } @@ -136,11 +136,12 @@ public class ExportMgr extends MasterDaemon { // If the job is created from replay thread, all plan info will be lost. // so the job has to be cancelled. String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); + // job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); + job.cancelReplayedExportJob(ExportFailMsg.CancelType.RUN_FAIL, failMsg); return false; } - if (job.updateState(JobState.IN_QUEUE)) { + if (job.updateState(ExportJobState.IN_QUEUE)) { LOG.info("Exchange pending status to in_queue status success. job: {}", job); return true; } @@ -148,7 +149,7 @@ public class ExportMgr extends MasterDaemon { } public List<ExportJob> getJobs() { - return Lists.newArrayList(idToJob.values()); + return Lists.newArrayList(exportIdToJob.values()); } public void addExportJob(ExportStmt stmt) throws Exception { @@ -156,7 +157,7 @@ public class ExportMgr extends MasterDaemon { ExportJob job = createJob(jobId, stmt); writeLock(); try { - if (labelToJobId.containsKey(job.getLabel())) { + if (labelToExportJobId.containsKey(job.getLabel())) { throw new LabelAlreadyUsedException(job.getLabel()); } unprotectAddJob(job); @@ -167,6 +168,28 @@ public class ExportMgr extends MasterDaemon { LOG.info("add export job. {}", job); } + public void addExportJobAndRegisterTask(ExportStmt stmt) throws Exception { + ExportJob job = stmt.getExportJob(); + long jobId = Env.getCurrentEnv().getNextId(); + job.setId(jobId); + writeLock(); + try { + if (labelToExportJobId.containsKey(job.getLabel())) { + throw new LabelAlreadyUsedException(job.getLabel()); + } + unprotectAddJob(job); + job.getJobExecutorList().forEach(executor -> { + Long taskId = ExportJob.register.registerTask(executor); + executor.setTaskId(taskId); + job.getTaskIdToExecutor().put(taskId, executor); + }); + Env.getCurrentEnv().getEditLog().logExportCreate(job); + } finally { + writeUnlock(); + } + LOG.info("add export job. {}", job); + } + public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { // List of export jobs waiting to be cancelled List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt); @@ -178,14 +201,20 @@ public class ExportMgr extends MasterDaemon { if (matchExportJobs.isEmpty()) { throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)"); } - for (ExportJob exportJob : matchExportJobs) { - exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel"); + try { + for (ExportJob exportJob : matchExportJobs) { + // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel"); + exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, null, + ExportFailMsg.CancelType.USER_CANCEL, "user cancel"); + } + } catch (JobException e) { + throw new AnalysisException(e.getMessage()); } } public void unprotectAddJob(ExportJob job) { - idToJob.put(job.getId(), job); - labelToJobId.putIfAbsent(job.getLabel(), job.getId()); + exportIdToJob.put(job.getId(), job); + labelToExportJobId.putIfAbsent(job.getLabel(), job.getId()); } private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException { @@ -224,28 +253,28 @@ public class ExportMgr extends MasterDaemon { }; } - private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception { - ExportJob job = new ExportJob(jobId); - job.setJob(stmt); - return job; + private ExportJob createJob(long jobId, ExportStmt stmt) { + ExportJob exportJob = stmt.getExportJob(); + exportJob.setId(jobId); + return exportJob; } public ExportJob getJob(long jobId) { - ExportJob job = null; + ExportJob job; readLock(); try { - job = idToJob.get(jobId); + job = exportIdToJob.get(jobId); } finally { readUnlock(); } return job; } - public List<ExportJob> getExportJobs(ExportJob.JobState state) { + public List<ExportJob> getExportJobs(ExportJobState state) { List<ExportJob> result = Lists.newArrayList(); readLock(); try { - for (ExportJob job : idToJob.values()) { + for (ExportJob job : exportIdToJob.values()) { if (job.getState() == state) { result.add(job); } @@ -260,7 +289,7 @@ public class ExportMgr extends MasterDaemon { // used for `show export` statement // NOTE: jobid and states may both specified, or only one of them, or neither public List<List<String>> getExportJobInfosByIdOrState( - long dbId, long jobId, String label, boolean isLabelUseLike, Set<ExportJob.JobState> states, + long dbId, long jobId, String label, boolean isLabelUseLike, Set<ExportJobState> states, ArrayList<OrderByPair> orderByPairs, long limit) throws AnalysisException { long resultNum = limit == -1L ? Integer.MAX_VALUE : limit; @@ -273,9 +302,9 @@ public class ExportMgr extends MasterDaemon { readLock(); try { int counter = 0; - for (ExportJob job : idToJob.values()) { + for (ExportJob job : exportIdToJob.values()) { long id = job.getId(); - ExportJob.JobState state = job.getState(); + ExportJobState state = job.getState(); String jobLabel = job.getLabel(); if (job.getDbId() != dbId) { @@ -345,7 +374,7 @@ public class ExportMgr extends MasterDaemon { readLock(); try { int counter = 0; - for (ExportJob job : idToJob.values()) { + for (ExportJob job : exportIdToJob.values()) { // check auth if (isJobShowable(job)) { exportJobInfos.add(composeExportJobInfo(job)); @@ -406,7 +435,7 @@ public class ExportMgr extends MasterDaemon { // task infos Map<String, Object> infoMap = Maps.newHashMap(); - List<String> partitions = job.getPartitions(); + List<String> partitions = job.getPartitionNames(); if (partitions == null) { partitions = Lists.newArrayList(); partitions.add("*"); @@ -422,7 +451,7 @@ public class ExportMgr extends MasterDaemon { infoMap.put("format", job.getFormat()); infoMap.put("line_delimiter", job.getLineDelimiter()); infoMap.put("columns", job.getColumns()); - infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + infoMap.put("tablet_num", job.getTabletsNum()); infoMap.put("max_file_size", job.getMaxFileSize()); infoMap.put("delete_existing_files", job.getDeleteExistingFiles()); jobInfo.add(new Gson().toJson(infoMap)); @@ -435,7 +464,7 @@ public class ExportMgr extends MasterDaemon { jobInfo.add(job.getTimeoutSecond()); // error msg - if (job.getState() == ExportJob.JobState.CANCELLED) { + if (job.getState() == ExportJobState.CANCELLED) { ExportFailMsg failMsg = job.getFailMsg(); jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg()); } else { @@ -443,7 +472,7 @@ public class ExportMgr extends MasterDaemon { } // outfileInfo - if (job.getState() == JobState.FINISHED) { + if (job.getState() == ExportJobState.FINISHED) { jobInfo.add(job.getOutfileInfo()); } else { jobInfo.add(FeConstants.null_string); @@ -457,15 +486,15 @@ public class ExportMgr extends MasterDaemon { writeLock(); try { - Iterator<Map.Entry<Long, ExportJob>> iter = idToJob.entrySet().iterator(); + Iterator<Map.Entry<Long, ExportJob>> iter = exportIdToJob.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<Long, ExportJob> entry = iter.next(); ExportJob job = entry.getValue(); if ((currentTimeMs - job.getCreateTimeMs()) / 1000 > Config.history_job_keep_max_second - && (job.getState() == ExportJob.JobState.CANCELLED - || job.getState() == ExportJob.JobState.FINISHED)) { + && (job.getState() == ExportJobState.CANCELLED + || job.getState() == ExportJobState.FINISHED)) { iter.remove(); - labelToJobId.remove(job.getLabel(), job.getId()); + labelToExportJobId.remove(job.getLabel(), job.getId()); } } } finally { @@ -482,11 +511,12 @@ public class ExportMgr extends MasterDaemon { } } - public void replayUpdateJobState(ExportJob.StateTransfer stateTransfer) { + public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) { readLock(); try { - ExportJob job = idToJob.get(stateTransfer.getJobId()); - job.updateState(stateTransfer.getState(), true); + ExportJob job = exportIdToJob.get(stateTransfer.getJobId()); + // job.updateState(stateTransfer.getState(), true); + job.replayExportJobState(stateTransfer.getState()); job.setStartTimeMs(stateTransfer.getStartTimeMs()); job.setFinishTimeMs(stateTransfer.getFinishTimeMs()); job.setFailMsg(stateTransfer.getFailMsg()); @@ -496,11 +526,11 @@ public class ExportMgr extends MasterDaemon { } } - public long getJobNum(ExportJob.JobState state, long dbId) { + public long getJobNum(ExportJobState state, long dbId) { int size = 0; readLock(); try { - for (ExportJob job : idToJob.values()) { + for (ExportJob job : exportIdToJob.values()) { if (job.getState() == state && job.getDbId() == dbId) { ++size; } @@ -511,11 +541,11 @@ public class ExportMgr extends MasterDaemon { return size; } - public long getJobNum(ExportJob.JobState state) { + public long getJobNum(ExportJobState state) { int size = 0; readLock(); try { - for (ExportJob job : idToJob.values()) { + for (ExportJob job : exportIdToJob.values()) { if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(), PrivPredicate.LOAD)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java new file mode 100644 index 0000000000..5fdc6962fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.analysis.OutFileClause; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.load.ExportFailMsg.CancelType; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.executor.TransientTaskExecutor; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.Lists; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +public class ExportTaskExecutor implements TransientTaskExecutor { + + List<SelectStmt> selectStmtLists; + + ExportJob exportJob; + + @Setter + Long taskId; + + private StmtExecutor stmtExecutor; + + private AtomicBoolean isCanceled; + + private AtomicBoolean isFinished; + + ExportTaskExecutor(List<SelectStmt> selectStmtLists, ExportJob exportJob) { + this.selectStmtLists = selectStmtLists; + this.exportJob = exportJob; + this.isCanceled = new AtomicBoolean(false); + this.isFinished = new AtomicBoolean(false); + } + + @Override + public void execute() throws JobException { + if (isCanceled.get()) { + throw new JobException("Export executor has been canceled, task id: {}", taskId); + } + exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, null, null); + List<OutfileInfo> outfileInfoList = Lists.newArrayList(); + for (int idx = 0; idx < selectStmtLists.size(); ++idx) { + if (isCanceled.get()) { + throw new JobException("Export executor has been canceled, task id: {}", taskId); + } + // check the version of tablets + try { + Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException( + exportJob.getTableName().getDb()); + OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl()); + table.readLock(); + try { + SelectStmt selectStmt = selectStmtLists.get(idx); + List<Long> tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds(); + for (Long tabletId : tabletIds) { + TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta( + tabletId); + Partition partition = table.getPartition(tabletMeta.getPartitionId()); + long nowVersion = partition.getVisibleVersion(); + long oldVersion = exportJob.getPartitionToVersion().get(partition.getName()); + if (nowVersion != oldVersion) { + exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, + CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed"); + throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}, " + + "now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion); + } + } + } finally { + table.readUnlock(); + } + } catch (AnalysisException e) { + exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, + ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); + throw new JobException(e); + } + + try (AutoCloseConnectContext r = buildConnectContext()) { + stmtExecutor = new StmtExecutor(r.connectContext, selectStmtLists.get(idx)); + stmtExecutor.execute(); + if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) { + exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, + ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage()); + return; + } + OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); + outfileInfoList.add(outfileInfo); + } catch (Exception e) { + exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, + ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); + throw new JobException(e); + } finally { + stmtExecutor.addProfileToSpan(); + } + } + if (isCanceled.get()) { + throw new JobException("Export executor has been canceled, task id: {}", taskId); + } + exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, outfileInfoList, null, null); + isFinished.getAndSet(true); + } + + @Override + public void cancel() throws JobException { + if (isFinished.get()) { + throw new JobException("Export executor has finished, task id: {}", taskId); + } + isCanceled.getAndSet(true); + if (stmtExecutor != null) { + stmtExecutor.cancel(); + } + } + + private AutoCloseConnectContext buildConnectContext() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setSessionVariable(exportJob.getSessionVariables()); + connectContext.setEnv(Env.getCurrentEnv()); + connectContext.setDatabase(exportJob.getTableName().getDb()); + connectContext.setQualifiedUser(exportJob.getQualifiedUser()); + connectContext.setCurrentUserIdentity(exportJob.getUserIdentity()); + UUID uuid = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + connectContext.setQueryId(queryId); + connectContext.setStartTime(); + connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); + return new AutoCloseConnectContext(connectContext); + } + + private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) { + OutfileInfo outfileInfo = new OutfileInfo(); + outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER)); + outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS)); + outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes"); + outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL)); + return outfileInfo; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java new file mode 100644 index 0000000000..b9befd9d32 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import com.google.gson.annotations.SerializedName; +import lombok.Data; + +@Data +public class OutfileInfo { + + @SerializedName("fileNumber") + private String fileNumber; + + @SerializedName("totalRows") + private String totalRows; + + @SerializedName("fileSize") + private String fileSize; + + @SerializedName("url") + private String url; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 68d2153556..9a1841be69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -61,6 +61,8 @@ import org.apache.doris.journal.local.LocalJournal; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.DeleteInfo; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; +import org.apache.doris.load.ExportJobStateTransfer; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.LoadJob; import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord; @@ -365,7 +367,7 @@ public class EditLog { break; } case OperationType.OP_EXPORT_UPDATE_STATE: { - ExportJob.StateTransfer op = (ExportJob.StateTransfer) journal.getData(); + ExportJobStateTransfer op = (ExportJobStateTransfer) journal.getData(); ExportMgr exportMgr = env.getExportMgr(); exportMgr.replayUpdateJobState(op); break; @@ -1461,8 +1463,8 @@ public class EditLog { logEdit(OperationType.OP_EXPORT_CREATE, job); } - public void logExportUpdateState(long jobId, ExportJob.JobState newState) { - ExportJob.StateTransfer transfer = new ExportJob.StateTransfer(jobId, newState); + public void logExportUpdateState(long jobId, ExportJobState newState) { + ExportJobStateTransfer transfer = new ExportJobStateTransfer(jobId, newState); logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 11190edc8b..7637c3869d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -181,7 +181,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.external.iceberg.IcebergTableCreationRecord; import org.apache.doris.load.DeleteHandler; -import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; import org.apache.doris.load.LoadJob; @@ -1922,8 +1922,8 @@ public class ShowExecutor { ExportMgr exportMgr = env.getExportMgr(); - Set<ExportJob.JobState> states = null; - ExportJob.JobState state = showExportStmt.getJobState(); + Set<ExportJobState> states = null; + ExportJobState state = showExportStmt.getJobState(); if (state != null) { states = Sets.newHashSet(state); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index cdd0d66f89..3bbd96408a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2213,7 +2213,8 @@ public class StmtExecutor { private void handleExportStmt() throws Exception { ExportStmt exportStmt = (ExportStmt) parsedStmt; - context.getEnv().getExportMgr().addExportJob(exportStmt); + // context.getEnv().getExportMgr().addExportJob(exportStmt); + context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt); } private void handleCtasStmt() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 48f3ce609e..1b17806c71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -30,8 +30,8 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.load.ExportFailMsg; import org.apache.doris.load.ExportFailMsg.CancelType; import org.apache.doris.load.ExportJob; -import org.apache.doris.load.ExportJob.JobState; -import org.apache.doris.load.ExportJob.OutfileInfo; +import org.apache.doris.load.ExportJobState; +import org.apache.doris.load.OutfileInfo; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; @@ -69,9 +69,9 @@ public class ExportExportingTask extends MasterTask { private ExportFailMsg failMsg; - private ExportJob.OutfileInfo outfileInfo; + private OutfileInfo outfileInfo; - public ExportResult(boolean isFailed, ExportFailMsg failMsg, ExportJob.OutfileInfo outfileInfo) { + public ExportResult(boolean isFailed, ExportFailMsg failMsg, OutfileInfo outfileInfo) { this.isFailed = isFailed; this.failMsg = failMsg; this.outfileInfo = outfileInfo; @@ -93,11 +93,11 @@ public class ExportExportingTask extends MasterTask { @Override protected void exec() { - if (job.getState() == JobState.IN_QUEUE) { + if (job.getState() == ExportJobState.IN_QUEUE) { handleInQueueState(); } - if (job.getState() != ExportJob.JobState.EXPORTING) { + if (job.getState() != ExportJobState.EXPORTING) { return; } LOG.info("begin execute export job in exporting state. job: {}", job); @@ -112,7 +112,7 @@ public class ExportExportingTask extends MasterTask { List<SelectStmt> selectStmtList = job.getSelectStmtList(); int completeTaskNum = 0; - List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList(); + List<OutfileInfo> outfileInfoList = Lists.newArrayList(); int parallelNum = selectStmtList.size(); CompletionService<ExportResult> completionService = new ExecutorCompletionService<>(exportExecPool); @@ -122,7 +122,7 @@ public class ExportExportingTask extends MasterTask { final int idx = i; completionService.submit(() -> { // maybe user cancelled this job - if (job.getState() != JobState.EXPORTING) { + if (job.getState() != ExportJobState.EXPORTING) { return new ExportResult(true, null, null); } try { @@ -162,7 +162,7 @@ public class ExportExportingTask extends MasterTask { return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage()), null); } - ExportJob.OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); + OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); return new ExportResult(false, null, outfileInfo); } catch (Exception e) { return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, @@ -250,8 +250,8 @@ public class ExportExportingTask extends MasterTask { return new AutoCloseConnectContext(connectContext); } - private ExportJob.OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) { - ExportJob.OutfileInfo outfileInfo = new ExportJob.OutfileInfo(); + private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) { + OutfileInfo outfileInfo = new OutfileInfo(); outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER)); outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS)); outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes"); @@ -274,7 +274,7 @@ public class ExportExportingTask extends MasterTask { // return; // } - if (job.updateState(ExportJob.JobState.EXPORTING)) { + if (job.updateState(ExportJobState.EXPORTING)) { LOG.info("Exchange pending status to exporting status success. job: {}", job); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java index c06cae0500..1e49f207d0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.load.ExportMgr; import org.apache.doris.utframe.TestWithFeService; @@ -154,12 +155,12 @@ public class CancelExportStmtTest extends TestWithFeService { List<ExportJob> exportJobList2 = Lists.newLinkedList(); ExportJob job1 = new ExportJob(); ExportJob job2 = new ExportJob(); - job2.updateState(ExportJob.JobState.CANCELLED, true); + job2.updateState(ExportJobState.CANCELLED, true); ExportJob job3 = new ExportJob(); - job3.updateState(ExportJob.JobState.EXPORTING, false); + job3.updateState(ExportJobState.EXPORTING, false); ExportJob job4 = new ExportJob(); ExportJob job5 = new ExportJob(); - job5.updateState(ExportJob.JobState.IN_QUEUE, false); + job5.updateState(ExportJobState.IN_QUEUE, false); exportJobList1.add(job1); exportJobList1.add(job2); exportJobList1.add(job3); @@ -188,15 +189,6 @@ public class CancelExportStmtTest extends TestWithFeService { Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1); - stateStringLiteral = new StringLiteral("IN_QUEUE"); - stateEqPredicate = - new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); - stmt = new CancelExportStmt(null, stateEqPredicate); - stmt.analyze(analyzer); - filter = ExportMgr.buildCancelJobFilter(stmt); - - Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1); - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index 7d1cd5a47a..e7209d58a4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -30,6 +30,7 @@ import org.apache.doris.common.VariableAnnotation; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJobState; import org.apache.doris.task.ExportExportingTask; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.utframe.TestWithFeService; @@ -171,14 +172,14 @@ public class SessionVariablesTest extends TestWithFeService { ExportStmt exportStmt = (ExportStmt) parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext); - ExportJob job = new ExportJob(1234); - job.setJob(exportStmt); + ExportJob job = exportStmt.getExportJob(); + job.setId(1234); new Expectations(job) { { job.getState(); minTimes = 0; - result = ExportJob.JobState.EXPORTING; + result = ExportJobState.EXPORTING; } }; @@ -201,14 +202,14 @@ public class SessionVariablesTest extends TestWithFeService { ExportStmt exportStmt = (ExportStmt) parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext); - ExportJob job = new ExportJob(1234); - job.setJob(exportStmt); + ExportJob job = exportStmt.getExportJob(); + job.setId(1234); new Expectations(job) { { job.getState(); minTimes = 0; - result = ExportJob.JobState.EXPORTING; + result = ExportJobState.EXPORTING; } }; diff --git a/regression-test/suites/export_p0/test_export_basic.groovy b/regression-test/suites/export_p0/test_export_basic.groovy index 95805db491..162b63065e 100644 --- a/regression-test/suites/export_p0/test_export_basic.groovy +++ b/regression-test/suites/export_p0/test_export_basic.groovy @@ -69,7 +69,8 @@ suite("test_export_basic", "p0") { PARTITION between_20_70 VALUES [("20"),("70")), PARTITION more_than_70 VALUES LESS THAN ("151") ) - DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES("replication_num" = "1"); """ StringBuilder sb = new StringBuilder() int i = 1 diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy b/regression-test/suites/export_p2/test_export_with_hdfs.groovy index 205b1ffd71..3be2794cbd 100644 --- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy +++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy @@ -37,7 +37,8 @@ suite("test_export_with_hdfs", "p2") { PARTITION between_20_70 VALUES [("20"),("70")), PARTITION more_than_70 VALUES LESS THAN ("151") ) - DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES("replication_num" = "1"); """ StringBuilder sb = new StringBuilder() int i = 1 @@ -62,8 +63,9 @@ suite("test_export_with_hdfs", "p2") { if (res[0][2] == "FINISHED") { def json = parseJson(res[0][11]) assert json instanceof List - assertEquals("1", json.fileNumber[0]) - return json.url[0]; + assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; } else if (res[0][2] == "CANCELLED") { throw new IllegalStateException("""export failed: ${res[0][10]}""") } else { diff --git a/regression-test/suites/export_p2/test_export_with_s3.groovy b/regression-test/suites/export_p2/test_export_with_s3.groovy index a26dde3238..82aa831cdd 100644 --- a/regression-test/suites/export_p2/test_export_with_s3.groovy +++ b/regression-test/suites/export_p2/test_export_with_s3.groovy @@ -38,7 +38,8 @@ suite("test_export_with_s3", "p2") { PARTITION between_20_70 VALUES [("20"),("70")), PARTITION more_than_70 VALUES LESS THAN ("151") ) - DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1"); + DISTRIBUTED BY HASH(id) BUCKETS 3 + PROPERTIES("replication_num" = "1"); """ StringBuilder sb = new StringBuilder() int i = 1 @@ -63,9 +64,9 @@ suite("test_export_with_s3", "p2") { if (res[0][2] == "FINISHED") { def json = parseJson(res[0][11]) assert json instanceof List - assertEquals("1", json.fileNumber[0]) - log.info("outfile_path: ${json.url[0]}") - return json.url[0]; + assertEquals("1", json.fileNumber[0][0]) + log.info("outfile_path: ${json.url[0][0]}") + return json.url[0][0]; } else if (res[0][2] == "CANCELLED") { throw new IllegalStateException("""export failed: ${res[0][10]}""") } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org