This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch fix_export_timeout_not_work in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6057aeed326097e93dd31fd5d9fd5718daf42fa1 Author: caiconghui1 <caicongh...@jd.com> AuthorDate: Wed Oct 25 19:32:58 2023 +0800 [fix](export) fix timeout property not work for export job --- .../java/org/apache/doris/analysis/ExportStmt.java | 22 +++++++++++++++++++--- .../org/apache/doris/load/ExportTaskExecutor.java | 1 + .../trees/plans/commands/ExportCommand.java | 18 ++++++++++++++++-- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 35e658d446d..b6ec8df488b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -58,7 +58,7 @@ import java.util.stream.Collectors; // EXPORT TABLE table_name [PARTITION (name1[, ...])] // TO 'export_target_path' // [PROPERTIES("key"="value")] -// BY BROKER 'broker_name' [( $broker_attrs)] +// WITH BROKER 'broker_name' [( $broker_attrs)] @Getter public class ExportStmt extends StatementBase { public static final String PARALLELISM = "parallelism"; @@ -67,6 +67,7 @@ public class ExportStmt extends StatementBase { private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; private static final String DEFAULT_PARALLELISM = "1"; + private static final Integer DEFAULT_TIMEOUT = 7200; private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() .add(LABEL) @@ -76,6 +77,7 @@ public class ExportStmt extends StatementBase { .add(OutFileClause.PROP_DELETE_EXISTING_FILES) .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR) .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER) + .add(PropertyAnalyzer.PROPERTIES_TIMEOUT) .add("format") .build(); @@ -97,6 +99,8 @@ public class ExportStmt extends StatementBase { private Integer parallelism; + private Integer timeout; + private String maxFileSize; private String deleteExistingFiles; private SessionVariable sessionVariables; @@ -118,6 +122,7 @@ public class ExportStmt extends StatementBase { this.brokerDesc = brokerDesc; this.columnSeparator = DEFAULT_COLUMN_SEPARATOR; this.lineDelimiter = DEFAULT_LINE_DELIMITER; + this.timeout = DEFAULT_TIMEOUT; Optional<SessionVariable> optionalSessionVariable = Optional.ofNullable( ConnectContext.get().getSessionVariable()); @@ -232,8 +237,10 @@ public class ExportStmt extends StatementBase { // set sessions exportJob.setQualifiedUser(this.qualifiedUser); exportJob.setUserIdentity(this.userIdentity); - exportJob.setSessionVariables(this.sessionVariables); - exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS()); + SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable( + ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable())); + exportJob.setSessionVariables(clonedSessionVariable); + exportJob.setTimeoutSecond(this.timeout); exportJob.setOrigStmt(this.getOrigStmt()); } @@ -323,6 +330,15 @@ public class ExportStmt extends StatementBase { throw new UserException("The value of parallelism is invalid!"); } + // timeout + String timeoutString = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT, + String.valueOf(DEFAULT_TIMEOUT)); + try { + this.timeout = Integer.parseInt(timeoutString); + } catch (NumberFormatException e) { + throw new UserException("The value of timeout is invalid!"); + } + // max_file_size this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""); this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, ""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index c7d7c4032ce..b647154dc11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -168,6 +168,7 @@ public class ExportTaskExecutor implements TransientTaskExecutor { private AutoCloseConnectContext buildConnectContext() { ConnectContext connectContext = new ConnectContext(); + exportJob.getSessionVariables().setQueryTimeoutS(exportJob.getTimeoutSecond()); connectContext.setSessionVariable(exportJob.getSessionVariables()); connectContext.setEnv(Env.getCurrentEnv()); connectContext.setDatabase(exportJob.getTableName().getDb()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index 42ae03aaec4..f2cd13a4793 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -68,7 +68,7 @@ import java.util.stream.Collectors; * EXPORT TABLE table_name [PARTITION (name1[, ...])] * TO 'export_target_path' * [PROPERTIES("key"="value")] - * BY BROKER 'broker_name' [( $broker_attrs)] + * WITH BROKER 'broker_name' [( $broker_attrs)] */ public class ExportCommand extends Command implements ForwardWithSync { public static final String PARALLELISM = "parallelism"; @@ -76,6 +76,8 @@ public class ExportCommand extends Command implements ForwardWithSync { private static final String DEFAULT_COLUMN_SEPARATOR = "\t"; private static final String DEFAULT_LINE_DELIMITER = "\n"; private static final String DEFAULT_PARALLELISM = "1"; + private static final Integer DEFAULT_TIMEOUT = 7200; + private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() .add(LABEL) .add(PARALLELISM) @@ -84,6 +86,7 @@ public class ExportCommand extends Command implements ForwardWithSync { .add(OutFileClause.PROP_DELETE_EXISTING_FILES) .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR) .add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER) + .add(PropertyAnalyzer.PROPERTIES_TIMEOUT) .add("format") .build(); @@ -305,7 +308,18 @@ public class ExportCommand extends Command implements ForwardWithSync { SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable( ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable())); exportJob.setSessionVariables(clonedSessionVariable); - exportJob.setTimeoutSecond(clonedSessionVariable.getQueryTimeoutS()); + + // set timeoutSecond + int timeoutSecond; + String timeoutString = fileProperties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT, + String.valueOf(DEFAULT_TIMEOUT)); + try { + timeoutSecond = Integer.parseInt(timeoutString); + } catch (NumberFormatException e) { + throw new UserException("The value of timeout is invalid!"); + } + + exportJob.setTimeoutSecond(timeoutSecond); // exportJob generate outfile sql exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org