This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e217401bf24 [Chore](nereids) Remove createRoutineLoadStmt (#55144)
e217401bf24 is described below
commit e217401bf244ea2614c6c04deb604e3cedaa1ca5
Author: yaoxiao <[email protected]>
AuthorDate: Tue Aug 26 15:27:40 2025 +0800
[Chore](nereids) Remove createRoutineLoadStmt (#55144)
---
.../bloom_parquet_table/bloom_parquet_table | Bin 58801541 -> 0 bytes
.../doris/analysis/CreateRoutineLoadStmt.java | 554 ---------------------
.../load/routineload/KafkaRoutineLoadJob.java | 47 +-
.../doris/load/routineload/RoutineLoadJob.java | 124 ++---
.../doris/load/routineload/RoutineLoadManager.java | 33 --
.../plans/commands/info/CreateRoutineLoadInfo.java | 7 +-
.../main/java/org/apache/doris/qe/DdlExecutor.java | 6 -
.../load/routineload/KafkaRoutineLoadJobTest.java | 36 +-
.../load/routineload/RoutineLoadManagerTest.java | 96 +---
.../persist/AlterRoutineLoadOperationLogTest.java | 6 +-
10 files changed, 71 insertions(+), 838 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table
deleted file mode 100644
index a861d2dfbe3..00000000000
Binary files
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/bloom_parquet_table/bloom_parquet_table
and /dev/null differ
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
deleted file mode 100644
index ba647e74a37..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ /dev/null
@@ -1,554 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.analysis;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.FeNameFormat;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
-import org.apache.doris.load.RoutineLoadDesc;
-import org.apache.doris.load.loadv2.LoadTask;
-import org.apache.doris.load.routineload.AbstractDataSourceProperties;
-import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
-import org.apache.doris.load.routineload.RoutineLoadJob;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.OriginStatement;
-import org.apache.doris.resource.workloadgroup.WorkloadGroup;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import lombok.Getter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Predicate;
-
-/*
- Create routine Load statement, continually load data from a streaming app
-
- syntax:
- CREATE ROUTINE LOAD [database.]name on table
- [load properties]
- [PROPERTIES
- (
- desired_concurrent_number = xxx,
- max_error_number = xxx,
- k1 = v1,
- ...
- kn = vn
- )]
- FROM type of routine load
- [(
- k1 = v1,
- ...
- kn = vn
- )]
-
- load properties:
- load property [[,] load property] ...
-
- load property:
- column separator | columns_mapping | partitions | where
-
- column separator:
- COLUMNS TERMINATED BY xxx
- columns_mapping:
- COLUMNS (c1, c2, c3 = c1 + c2)
- partitions:
- PARTITIONS (p1, p2, p3)
- where:
- WHERE c1 > 1
-
- type of routine load:
- KAFKA
-*/
-public class CreateRoutineLoadStmt extends DdlStmt implements
NotFallbackInParser {
- private static final Logger LOG =
LogManager.getLogger(CreateRoutineLoadStmt.class);
-
- // routine load properties
- public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY =
"desired_concurrent_number";
- public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY =
"current_concurrent_number";
- // max error number in ten thousand records
- public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number";
- public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
- // the following 3 properties limit the time and batch size of a single
routine load task
- public static final String MAX_BATCH_INTERVAL_SEC_PROPERTY =
"max_batch_interval";
- public static final String MAX_BATCH_ROWS_PROPERTY = "max_batch_rows";
- public static final String MAX_BATCH_SIZE_PROPERTY = "max_batch_size";
- public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
-
- public static final String FORMAT = "format"; // the value is csv or json,
default is csv
- public static final String STRIP_OUTER_ARRAY = "strip_outer_array";
- public static final String JSONPATHS = "jsonpaths";
- public static final String JSONROOT = "json_root";
- public static final String NUM_AS_STRING = "num_as_string";
- public static final String FUZZY_PARSE = "fuzzy_parse";
-
- public static final String PARTIAL_COLUMNS = "partial_columns";
-
- public static final String WORKLOAD_GROUP = "workload_group";
-
- private static final String NAME_TYPE = "ROUTINE LOAD NAME";
- public static final String ENDPOINT_REGEX =
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
- public static final String SEND_BATCH_PARALLELISM =
"send_batch_parallelism";
- public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
-
- private AbstractDataSourceProperties dataSourceProperties;
-
-
- private static final ImmutableSet<String> PROPERTIES_SET = new
ImmutableSet.Builder<String>()
- .add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
- .add(MAX_ERROR_NUMBER_PROPERTY)
- .add(MAX_FILTER_RATIO_PROPERTY)
- .add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
- .add(MAX_BATCH_ROWS_PROPERTY)
- .add(MAX_BATCH_SIZE_PROPERTY)
- .add(FORMAT)
- .add(JSONPATHS)
- .add(STRIP_OUTER_ARRAY)
- .add(NUM_AS_STRING)
- .add(FUZZY_PARSE)
- .add(JSONROOT)
- .add(LoadStmt.STRICT_MODE)
- .add(LoadStmt.TIMEZONE)
- .add(EXEC_MEM_LIMIT_PROPERTY)
- .add(SEND_BATCH_PARALLELISM)
- .add(LOAD_TO_SINGLE_TABLET)
- .add(PARTIAL_COLUMNS)
- .add(WORKLOAD_GROUP)
- .add(LoadStmt.KEY_ENCLOSE)
- .add(LoadStmt.KEY_ESCAPE)
- .build();
-
- private final LabelName labelName;
- private String tableName;
- private final List<ParseNode> loadPropertyList;
- private final Map<String, String> jobProperties;
- private final String typeName;
-
- // the following variables will be initialized after analyze
- // -1 as unset, the default value will set in RoutineLoadJob
- private String name;
- private String dbName;
- private RoutineLoadDesc routineLoadDesc;
- private int desiredConcurrentNum = 1;
- private long maxErrorNum = -1;
- private double maxFilterRatio = -1;
- private long maxBatchIntervalS = -1;
- private long maxBatchRows = -1;
- private long maxBatchSizeBytes = -1;
- private boolean strictMode = true;
- private long execMemLimit = 2 * 1024 * 1024 * 1024L;
- private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
- private int sendBatchParallelism = 1;
- private boolean loadToSingleTablet = false;
-
- private FileFormatProperties fileFormatProperties;
-
- private String workloadGroupName = "";
-
- /**
- * support partial columns load(Only Unique Key Columns)
- */
- @Getter
- private boolean isPartialUpdate = false;
-
- private String comment = "";
-
- private LoadTask.MergeType mergeType;
-
- private boolean isMultiTable = false;
-
- public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v)
-> v > 0L;
- public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> v >= 0L;
- public static final Predicate<Double> MAX_FILTER_RATIO_PRED = (v) -> v >=
0 && v <= 1;
- public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> v >=
1;
- public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >=
200000;
- public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100
* 1024 * 1024
- && v <= (long)
(1024 * 1024 * 1024) * 10;
- public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L;
- public static final Predicate<Long> SEND_BATCH_PARALLELISM_PRED = (v) -> v
> 0L;
-
- public CreateRoutineLoadStmt(LabelName labelName, String tableName,
List<ParseNode> loadPropertyList,
- Map<String, String> jobProperties, String
typeName,
- Map<String, String> dataSourceProperties,
LoadTask.MergeType mergeType,
- String comment) {
- this.labelName = labelName;
- if (StringUtils.isBlank(tableName)) {
- this.isMultiTable = true;
- }
- this.tableName = tableName;
- this.loadPropertyList = loadPropertyList;
- this.jobProperties = jobProperties == null ? Maps.newHashMap() :
jobProperties;
- this.typeName = typeName.toUpperCase();
- this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
- .createDataSource(typeName, dataSourceProperties,
this.isMultiTable);
- this.mergeType = mergeType;
- this.isPartialUpdate =
this.jobProperties.getOrDefault(PARTIAL_COLUMNS,
"false").equalsIgnoreCase("true");
- if (comment != null) {
- this.comment = comment;
- }
- String format =
jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
- fileFormatProperties =
FileFormatProperties.createFileFormatProperties(format);
- }
-
- /*
- * make stmt by nereids
- */
- public CreateRoutineLoadStmt(LabelName labelName, String dbName, String
name, String tableName,
- List<ParseNode> loadPropertyList, OriginStatement origStmt,
UserIdentity userIdentity,
- Map<String, String> jobProperties, String typeName,
RoutineLoadDesc routineLoadDesc,
- int desireTaskConcurrentNum, long maxErrorNum, double
maxFilterRatio, long maxBatchIntervalS,
- long maxBatchRows, long maxBatchSizeBytes, long execMemLimit, int
sendBatchParallelism, String timezone,
- String workloadGroupName, boolean loadToSingleTablet, boolean
strictMode,
- boolean isPartialUpdate, AbstractDataSourceProperties
dataSourceProperties,
- FileFormatProperties fileFormatProperties) {
- this.labelName = labelName;
- this.dbName = dbName;
- this.name = name;
- this.tableName = tableName;
- this.loadPropertyList = loadPropertyList;
- this.setOrigStmt(origStmt);
- this.setUserInfo(userIdentity);
- this.jobProperties = jobProperties;
- this.typeName = typeName;
- this.routineLoadDesc = routineLoadDesc;
- this.desiredConcurrentNum = desireTaskConcurrentNum;
- this.maxErrorNum = maxErrorNum;
- this.maxFilterRatio = maxFilterRatio;
- this.maxBatchIntervalS = maxBatchIntervalS;
- this.maxBatchRows = maxBatchRows;
- this.maxBatchSizeBytes = maxBatchSizeBytes;
- this.execMemLimit = execMemLimit;
- this.sendBatchParallelism = sendBatchParallelism;
- this.timezone = timezone;
- this.workloadGroupName = workloadGroupName;
- this.loadToSingleTablet = loadToSingleTablet;
- this.strictMode = strictMode;
- this.isPartialUpdate = isPartialUpdate;
- this.dataSourceProperties = dataSourceProperties;
- this.fileFormatProperties = fileFormatProperties;
- }
-
- public String getName() {
- return name;
- }
-
- public String getDBName() {
- return dbName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public String getTypeName() {
- return typeName;
- }
-
- public RoutineLoadDesc getRoutineLoadDesc() {
- return routineLoadDesc;
- }
-
- public int getDesiredConcurrentNum() {
- return desiredConcurrentNum;
- }
-
- public long getMaxErrorNum() {
- return maxErrorNum;
- }
-
- public double getMaxFilterRatio() {
- return maxFilterRatio;
- }
-
- public long getMaxBatchIntervalS() {
- return maxBatchIntervalS;
- }
-
- public long getMaxBatchRows() {
- return maxBatchRows;
- }
-
- public long getMaxBatchSize() {
- return maxBatchSizeBytes;
- }
-
- public long getExecMemLimit() {
- return execMemLimit;
- }
-
- public int getSendBatchParallelism() {
- return sendBatchParallelism;
- }
-
- public boolean isLoadToSingleTablet() {
- return loadToSingleTablet;
- }
-
- public boolean isStrictMode() {
- return strictMode;
- }
-
- public String getTimezone() {
- return timezone;
- }
-
- public LoadTask.MergeType getMergeType() {
- return mergeType;
- }
-
- public FileFormatProperties getFileFormatProperties() {
- return fileFormatProperties;
- }
-
- public AbstractDataSourceProperties getDataSourceProperties() {
- return dataSourceProperties;
- }
-
- public String getComment() {
- return comment;
- }
-
- public String getWorkloadGroupName() {
- return this.workloadGroupName;
- }
-
- @Override
- public void analyze() throws UserException {
- super.analyze();
- // check dbName and tableName
- checkDBTable();
- // check name
- try {
- FeNameFormat.checkCommonName(NAME_TYPE, name);
- } catch (AnalysisException e) {
- // 64 is the length of regular expression matching
- // (FeNameFormat.COMMON_NAME_REGEX/UNDERSCORE_COMMON_NAME_REGEX)
- throw new AnalysisException(e.getMessage()
- + " Maybe routine load job name is longer than 64 or
contains illegal characters");
- }
- // check load properties include column separator etc.
- checkLoadProperties();
- // check routine load job properties include desired concurrent number
etc.
- checkJobProperties();
- // check data source properties
- checkDataSourceProperties();
- // analyze merge type
- if (routineLoadDesc != null) {
- routineLoadDesc.analyze();
- } else if (mergeType == LoadTask.MergeType.MERGE) {
- throw new AnalysisException("Excepted DELETE ON clause when merge
type is MERGE.");
- }
- }
-
- public void checkDBTable() throws AnalysisException {
- labelName.analyze();
- dbName = labelName.getDbName();
- name = labelName.getLabelName();
- Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
- if (isPartialUpdate && isMultiTable) {
- throw new AnalysisException("Partial update is not supported in
multi-table load.");
- }
- if (isMultiTable) {
- return;
- }
- if (Strings.isNullOrEmpty(tableName)) {
- throw new AnalysisException("Table name should not be null");
- }
- Table table = db.getTableOrAnalysisException(tableName);
- if (mergeType != LoadTask.MergeType.APPEND
- && (table.getType() != Table.TableType.OLAP
- || ((OlapTable) table).getKeysType() != KeysType.UNIQUE_KEYS))
{
- throw new AnalysisException("load by MERGE or DELETE is only
supported in unique tables.");
- }
- if (mergeType != LoadTask.MergeType.APPEND
- && !(table.getType() == Table.TableType.OLAP && ((OlapTable)
table).hasDeleteSign())) {
- throw new AnalysisException("load by MERGE or DELETE need to
upgrade table to support batch delete.");
- }
- if (isPartialUpdate && !((OlapTable)
table).getEnableUniqueKeyMergeOnWrite()) {
- throw new AnalysisException("load by PARTIAL_COLUMNS is only
supported in unique table MoW");
- }
- }
-
- public void checkLoadProperties() throws UserException {
- Separator columnSeparator = null;
- // TODO(yangzhengguo01): add line delimiter to properties
- Separator lineDelimiter = null;
- ImportColumnsStmt importColumnsStmt = null;
- ImportWhereStmt precedingImportWhereStmt = null;
- ImportWhereStmt importWhereStmt = null;
- ImportSequenceStmt importSequenceStmt = null;
- PartitionNames partitionNames = null;
- ImportDeleteOnStmt importDeleteOnStmt = null;
- if (loadPropertyList != null) {
- for (ParseNode parseNode : loadPropertyList) {
- if (parseNode instanceof Separator) {
- // check column separator
- if (columnSeparator != null) {
- throw new AnalysisException("repeat setting of column
separator");
- }
- columnSeparator = (Separator) parseNode;
- columnSeparator.analyze();
- } else if (parseNode instanceof ImportColumnsStmt) {
- if (isMultiTable) {
- throw new AnalysisException("Multi-table load does not
support setting columns info");
- }
- // check columns info
- if (importColumnsStmt != null) {
- throw new AnalysisException("repeat setting of columns
info");
- }
- importColumnsStmt = (ImportColumnsStmt) parseNode;
- } else if (parseNode instanceof ImportWhereStmt) {
- // check where expr
- ImportWhereStmt node = (ImportWhereStmt) parseNode;
- if (node.isPreceding()) {
- if (isMultiTable) {
- throw new AnalysisException("Multi-table load does
not support setting columns info");
- }
- if (precedingImportWhereStmt != null) {
- throw new AnalysisException("repeat setting of
preceding where predicate");
- }
- precedingImportWhereStmt = node;
- } else {
- if (importWhereStmt != null) {
- throw new AnalysisException("repeat setting of
where predicate");
- }
- importWhereStmt = node;
- }
- } else if (parseNode instanceof PartitionNames) {
- // check partition names
- if (partitionNames != null) {
- throw new AnalysisException("repeat setting of
partition names");
- }
- partitionNames = (PartitionNames) parseNode;
- partitionNames.analyze();
- } else if (parseNode instanceof ImportDeleteOnStmt) {
- // check delete expr
- if (importDeleteOnStmt != null) {
- throw new AnalysisException("repeat setting of delete
predicate");
- }
- importDeleteOnStmt = (ImportDeleteOnStmt) parseNode;
- } else if (parseNode instanceof ImportSequenceStmt) {
- // check sequence column
- if (importSequenceStmt != null) {
- throw new AnalysisException("repeat setting of
sequence column");
- }
- importSequenceStmt = (ImportSequenceStmt) parseNode;
- }
- }
- }
- routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter,
importColumnsStmt,
- precedingImportWhereStmt, importWhereStmt,
- partitionNames, importDeleteOnStmt == null ? null :
importDeleteOnStmt.getExpr(), mergeType,
- importSequenceStmt == null ? null :
importSequenceStmt.getSequenceColName());
- }
-
- private void checkJobProperties() throws UserException {
- Optional<String> optional = jobProperties.keySet().stream().filter(
- entity -> !PROPERTIES_SET.contains(entity)).findFirst();
- if (optional.isPresent()) {
- throw new AnalysisException(optional.get() + " is invalid
property");
- }
-
- desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(
- jobProperties.get(DESIRED_CONCURRENT_NUMBER_PROPERTY),
- Config.max_routine_load_task_concurrent_num,
DESIRED_CONCURRENT_NUMBER_PRED,
- DESIRED_CONCURRENT_NUMBER_PROPERTY + " must be greater than
0")).intValue();
-
- maxErrorNum =
Util.getLongPropertyOrDefault(jobProperties.get(MAX_ERROR_NUMBER_PROPERTY),
- RoutineLoadJob.DEFAULT_MAX_ERROR_NUM, MAX_ERROR_NUMBER_PRED,
- MAX_ERROR_NUMBER_PROPERTY + " should >= 0");
-
- maxFilterRatio =
Util.getDoublePropertyOrDefault(jobProperties.get(MAX_FILTER_RATIO_PROPERTY),
- RoutineLoadJob.DEFAULT_MAX_FILTER_RATIO, MAX_FILTER_RATIO_PRED,
- MAX_FILTER_RATIO_PROPERTY + " should between 0 and 1");
-
- maxBatchIntervalS =
Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_INTERVAL_SEC_PROPERTY),
- RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND,
MAX_BATCH_INTERVAL_PRED,
- MAX_BATCH_INTERVAL_SEC_PROPERTY + " should >= 1");
-
- maxBatchRows =
Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_ROWS_PROPERTY),
- RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, MAX_BATCH_ROWS_PRED,
- MAX_BATCH_ROWS_PROPERTY + " should > 200000");
-
- maxBatchSizeBytes =
Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY),
- RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED,
- MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 10GB");
-
- strictMode =
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE),
- RoutineLoadJob.DEFAULT_STRICT_MODE,
- LoadStmt.STRICT_MODE + " should be a boolean");
- execMemLimit =
Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY),
- RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED,
- EXEC_MEM_LIMIT_PROPERTY + " must be greater than 0");
-
- sendBatchParallelism = ((Long)
Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM),
-
ConnectContext.get().getSessionVariable().getSendBatchParallelism(),
SEND_BATCH_PARALLELISM_PRED,
- SEND_BATCH_PARALLELISM + " must be greater than
0")).intValue();
- loadToSingleTablet =
Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
- RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
- LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
-
- String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
- if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
- ConnectContext tmpCtx = new ConnectContext();
- if (Config.isCloudMode()) {
- tmpCtx.setCloudCluster(ConnectContext.get().getCloudCluster());
- }
-
tmpCtx.setCurrentUserIdentity(ConnectContext.get().getCurrentUserIdentity());
-
tmpCtx.getSessionVariable().setWorkloadGroup(inputWorkloadGroupStr);
- List<WorkloadGroup> wgList =
Env.getCurrentEnv().getWorkloadGroupMgr()
- .getWorkloadGroup(tmpCtx);
- if (wgList.size() == 0) {
- throw new UserException("Can not find workload group " +
inputWorkloadGroupStr);
- }
- workloadGroupName = inputWorkloadGroupStr;
- }
-
- if (ConnectContext.get() != null) {
- timezone = ConnectContext.get().getSessionVariable().getTimeZone();
- }
- timezone =
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE,
timezone));
-
- fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
- }
-
- private void checkDataSourceProperties() throws UserException {
- this.dataSourceProperties.setTimezone(this.timezone);
- this.dataSourceProperties.analyze();
- }
-
- @Override
- public StmtType stmtType() {
- return StmtType.CREATE;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index ee15c69925b..5da8f90a0c9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -17,7 +17,6 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
@@ -524,33 +523,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return kafkaRoutineLoadJob;
}
- public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt
stmt) throws UserException {
- // check db and table
- Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDBName());
-
- long id = Env.getCurrentEnv().getNextId();
- KafkaDataSourceProperties kafkaProperties =
(KafkaDataSourceProperties) stmt.getDataSourceProperties();
- KafkaRoutineLoadJob kafkaRoutineLoadJob;
- if (kafkaProperties.isMultiTable()) {
- kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
- db.getId(),
- kafkaProperties.getBrokerList(),
kafkaProperties.getTopic(), stmt.getUserInfo(), true);
- } else {
- OlapTable olapTable =
db.getOlapTableOrDdlException(stmt.getTableName());
- checkMeta(olapTable, stmt.getRoutineLoadDesc());
- long tableId = olapTable.getId();
- // init kafka routine load job
- kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
- db.getId(), tableId,
- kafkaProperties.getBrokerList(),
kafkaProperties.getTopic(), stmt.getUserInfo());
- }
- kafkaRoutineLoadJob.setOptional(stmt);
- kafkaRoutineLoadJob.checkCustomProperties();
- kafkaRoutineLoadJob.checkCustomPartition();
-
- return kafkaRoutineLoadJob;
- }
-
private void checkCustomPartition() throws UserException {
if (customKafkaPartitions.isEmpty()) {
return;
@@ -646,21 +618,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" +
UUID.randomUUID());
}
- @Override
- protected void setOptional(CreateRoutineLoadStmt stmt) throws
UserException {
- super.setOptional(stmt);
- KafkaDataSourceProperties kafkaDataSourceProperties
- = (KafkaDataSourceProperties) stmt.getDataSourceProperties();
- if
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
{
- setCustomKafkaPartitions(kafkaDataSourceProperties);
- }
- if
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
-
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
- }
- // set group id if not specified
- this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" +
UUID.randomUUID());
- }
-
// this is an unprotected method which is called in the initialization
function
private void setCustomKafkaPartitions(KafkaDataSourceProperties
kafkaDataSourceProperties) throws LoadException {
@@ -822,8 +779,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
Map<String, String> copiedJobProperties =
Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
- this.isPartialUpdate =
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS));
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
+ this.isPartialUpdate =
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
}
}
LOG.info("modify the properties of kafka routine load job: {},
jobProperties: {}, datasource properties: {}",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index e74e8ff3e43..34cd9e5228f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -17,7 +17,6 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.LoadStmt;
@@ -420,73 +419,6 @@ public abstract class RoutineLoadJob
}
}
- protected void setOptional(CreateRoutineLoadStmt stmt) throws
UserException {
- setRoutineLoadDesc(stmt.getRoutineLoadDesc());
- if (stmt.getDesiredConcurrentNum() != -1) {
- this.desireTaskConcurrentNum = stmt.getDesiredConcurrentNum();
- }
- if (stmt.getMaxErrorNum() != -1) {
- this.maxErrorNum = stmt.getMaxErrorNum();
- }
- if (stmt.getMaxFilterRatio() != -1) {
- this.maxFilterRatio = stmt.getMaxFilterRatio();
- }
- if (stmt.getMaxBatchIntervalS() != -1) {
- this.maxBatchIntervalS = stmt.getMaxBatchIntervalS();
- }
- if (stmt.getMaxBatchRows() != -1) {
- this.maxBatchRows = stmt.getMaxBatchRows();
- }
- if (stmt.getMaxBatchSize() != -1) {
- this.maxBatchSizeBytes = stmt.getMaxBatchSize();
- }
- if (stmt.getExecMemLimit() != -1) {
- this.execMemLimit = stmt.getExecMemLimit();
- }
- if (stmt.getSendBatchParallelism() > 0) {
- this.sendBatchParallelism = stmt.getSendBatchParallelism();
- }
- if (stmt.isLoadToSingleTablet()) {
- this.loadToSingleTablet = stmt.isLoadToSingleTablet();
- }
- jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone());
- jobProperties.put(LoadStmt.STRICT_MODE,
String.valueOf(stmt.isStrictMode()));
- jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM,
String.valueOf(this.sendBatchParallelism));
- jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET,
String.valueOf(this.loadToSingleTablet));
- jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
stmt.isPartialUpdate() ? "true" : "false");
- if (stmt.isPartialUpdate()) {
- this.isPartialUpdate = true;
- }
- jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(maxFilterRatio));
-
- FileFormatProperties fileFormatProperties =
stmt.getFileFormatProperties();
- if (fileFormatProperties instanceof CsvFileFormatProperties) {
- CsvFileFormatProperties csvFileFormatProperties =
(CsvFileFormatProperties) fileFormatProperties;
- jobProperties.put(FileFormatProperties.PROP_FORMAT, "csv");
- jobProperties.put(LoadStmt.KEY_ENCLOSE, new String(new
byte[]{csvFileFormatProperties.getEnclose()}));
- jobProperties.put(LoadStmt.KEY_ESCAPE, new String(new
byte[]{csvFileFormatProperties.getEscape()}));
- this.enclose = csvFileFormatProperties.getEnclose();
- this.escape = csvFileFormatProperties.getEscape();
- } else if (fileFormatProperties instanceof JsonFileFormatProperties) {
- JsonFileFormatProperties jsonFileFormatProperties =
(JsonFileFormatProperties) fileFormatProperties;
- jobProperties.put(FileFormatProperties.PROP_FORMAT, "json");
- jobProperties.put(JsonFileFormatProperties.PROP_JSON_PATHS,
jsonFileFormatProperties.getJsonPaths());
- jobProperties.put(JsonFileFormatProperties.PROP_JSON_ROOT,
jsonFileFormatProperties.getJsonRoot());
- jobProperties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
-
String.valueOf(jsonFileFormatProperties.isStripOuterArray()));
- jobProperties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING,
- String.valueOf(jsonFileFormatProperties.isNumAsString()));
- jobProperties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE,
- String.valueOf(jsonFileFormatProperties.isFuzzyParse()));
- } else {
- throw new UserException("Invalid format type.");
- }
-
- if (!StringUtils.isEmpty(stmt.getWorkloadGroupName())) {
- jobProperties.put(WORKLOAD_GROUP, stmt.getWorkloadGroupName());
- }
- }
-
protected void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
if (routineLoadDesc != null) {
if (routineLoadDesc.getColumnsInfo() != null) {
@@ -1820,15 +1752,15 @@ public abstract class RoutineLoadJob
}
// 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt
sb.append("PROPERTIES\n(\n");
- appendProperties(sb,
CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
desireTaskConcurrentNum, false);
- appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY,
maxErrorNum, false);
- appendProperties(sb, CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY,
maxFilterRatio, false);
- appendProperties(sb,
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS,
false);
- appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY,
maxBatchRows, false);
- appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY,
maxBatchSizeBytes, false);
+ appendProperties(sb,
CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY,
desireTaskConcurrentNum, false);
+ appendProperties(sb, CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY,
maxErrorNum, false);
+ appendProperties(sb, CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY,
maxFilterRatio, false);
+ appendProperties(sb,
CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS,
false);
+ appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY,
maxBatchRows, false);
+ appendProperties(sb, CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY,
maxBatchSizeBytes, false);
appendProperties(sb, FileFormatProperties.PROP_FORMAT, getFormat(),
false);
if (isPartialUpdate) {
- appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS,
isPartialUpdate, false);
+ appendProperties(sb, CreateRoutineLoadInfo.PARTIAL_COLUMNS,
isPartialUpdate, false);
}
appendProperties(sb, JsonFileFormatProperties.PROP_JSON_PATHS,
getJsonPaths(), false);
appendProperties(sb, JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY,
isStripOuterArray(), false);
@@ -1923,14 +1855,14 @@ public abstract class RoutineLoadJob
sequenceCol == null ? STAR_STRING : sequenceCol);
// job properties defined in CreateRoutineLoadStmt
- jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
- jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY,
String.valueOf(maxErrorNum));
-
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY,
String.valueOf(maxBatchIntervalS));
- jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY,
String.valueOf(maxBatchRows));
- jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY,
String.valueOf(maxBatchSizeBytes));
-
jobProperties.put(CreateRoutineLoadStmt.CURRENT_CONCURRENT_NUMBER_PROPERTY,
+ jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS,
String.valueOf(isPartialUpdate));
+ jobProperties.put(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY,
String.valueOf(maxErrorNum));
+
jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY,
String.valueOf(maxBatchIntervalS));
+ jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY,
String.valueOf(maxBatchRows));
+ jobProperties.put(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY,
String.valueOf(maxBatchSizeBytes));
+
jobProperties.put(CreateRoutineLoadInfo.CURRENT_CONCURRENT_NUMBER_PROPERTY,
String.valueOf(currentTaskConcurrentNum));
-
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
+
jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY,
String.valueOf(desireTaskConcurrentNum));
jobProperties.put(LoadStmt.EXEC_MEM_LIMIT,
String.valueOf(execMemLimit));
jobProperties.put(LoadStmt.KEY_IN_PARAM_MERGE_TYPE,
mergeType.toString());
@@ -1974,7 +1906,7 @@ public abstract class RoutineLoadJob
isMultiTable = true;
}
jobProperties.forEach((k, v) -> {
- if (k.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+ if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
isPartialUpdate = Boolean.parseBoolean(v);
}
});
@@ -2015,35 +1947,35 @@ public abstract class RoutineLoadJob
// for ALTER ROUTINE LOAD
protected void modifyCommonJobProperties(Map<String, String>
jobProperties) {
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY))
{
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY))
{
this.desireTaskConcurrentNum = Integer.parseInt(
-
jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
+
jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
}
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) {
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY)) {
this.maxErrorNum = Long.parseLong(
-
jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
+
jobProperties.remove(CreateRoutineLoadInfo.MAX_ERROR_NUMBER_PROPERTY));
}
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY)) {
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY)) {
this.maxFilterRatio = Double.parseDouble(
-
jobProperties.remove(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY));
-
this.jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(maxFilterRatio));
+
jobProperties.remove(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY));
+
this.jobProperties.put(CreateRoutineLoadInfo.MAX_FILTER_RATIO_PROPERTY,
String.valueOf(maxFilterRatio));
}
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY))
{
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY))
{
this.maxBatchIntervalS = Long.parseLong(
-
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
+
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_INTERVAL_SEC_PROPERTY));
}
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) {
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)) {
this.maxBatchRows = Long.parseLong(
-
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
+
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY));
}
- if
(jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) {
+ if
(jobProperties.containsKey(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)) {
this.maxBatchSizeBytes = Long.parseLong(
-
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY));
+
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index a22eae6b009..7d5486f5565 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -17,7 +17,6 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@@ -203,38 +202,6 @@ public class RoutineLoadManager implements Writable {
info.getTableName());
}
- // cloud override
- public void createRoutineLoadJob(CreateRoutineLoadStmt
createRoutineLoadStmt)
- throws UserException {
- // check load auth
- if
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(),
- InternalCatalog.INTERNAL_CATALOG_NAME,
- createRoutineLoadStmt.getDBName(),
- createRoutineLoadStmt.getTableName(),
- PrivPredicate.LOAD)) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"LOAD",
- ConnectContext.get().getQualifiedUser(),
- ConnectContext.get().getRemoteIP(),
- createRoutineLoadStmt.getDBName(),
- createRoutineLoadStmt.getDBName() + ": " +
createRoutineLoadStmt.getTableName());
- }
-
- RoutineLoadJob routineLoadJob = null;
- LoadDataSourceType type =
LoadDataSourceType.valueOf(createRoutineLoadStmt.getTypeName());
- switch (type) {
- case KAFKA:
- routineLoadJob =
KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
- break;
- default:
- throw new UserException("Unknown data source type: " + type);
- }
-
- routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
- routineLoadJob.setComment(createRoutineLoadStmt.getComment());
- addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName(),
- createRoutineLoadStmt.getTableName());
- }
-
public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String
dbName, String tableName)
throws UserException {
writeLock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 2e13c3e7e6b..dacc706ea35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -444,7 +444,10 @@ public class CreateRoutineLoadInfo {
importSequenceStmt == null ? null :
importSequenceStmt.getSequenceColName());
}
- private void checkJobProperties() throws UserException {
+ /**
+ * checkJobProperties
+ */
+ public void checkJobProperties() throws UserException {
Optional<String> optional = jobProperties.keySet().stream().filter(
entity -> !PROPERTIES_SET.contains(entity)).findFirst();
if (optional.isPresent()) {
@@ -506,7 +509,7 @@ public class CreateRoutineLoadInfo {
this.workloadGroupName = inputWorkloadGroupStr;
}
- if (ConnectContext.get() != null) {
+ if (ConnectContext.get().getSessionVariable().getTimeZone() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
}
timezone =
TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE,
timezone));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 3540fc6bb29..0b87a3550bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -17,9 +17,7 @@
package org.apache.doris.qe;
-import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.catalog.Env;
@@ -44,10 +42,6 @@ public class DdlExecutor {
env.createTable((CreateTableStmt) ddlStmt);
} else if (ddlStmt instanceof CreateMaterializedViewStmt) {
env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt);
- } else if (ddlStmt instanceof AlterTableStmt) {
- env.alterTable((AlterTableStmt) ddlStmt);
- } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
-
env.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt)
ddlStmt);
} else {
LOG.warn("Unkown statement " + ddlStmt.getClass());
throw new DdlException("Unknown statement.");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 63452a5d59c..1d8b58257dc 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -17,10 +17,8 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.ImportSequenceStmt;
import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.ParseNode;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.UserIdentity;
@@ -42,6 +40,10 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.mysql.privilege.MockedAuth;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
@@ -66,6 +68,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -76,6 +79,7 @@ public class KafkaRoutineLoadJobTest {
private String jobName = "job1";
private String dbName = "db1";
private LabelName labelName = new LabelName(dbName, jobName);
+ private LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName);
private String tableNameString = "table1";
private String topicName = "topic1";
private String serverAddress = "http://127.0.0.1:8080";
@@ -246,10 +250,11 @@ public class KafkaRoutineLoadJobTest {
public void testFromCreateStmt(@Mocked Env env,
@Injectable Database database,
@Injectable OlapTable table) throws UserException {
- CreateRoutineLoadStmt createRoutineLoadStmt =
initCreateRoutineLoadStmt();
+ CreateRoutineLoadInfo createRoutineLoadInfo =
initCreateRoutineLoadInfo();
+ createRoutineLoadInfo.validate(connectContext);
RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator,
null, null, null, null, partitionNames, null,
LoadTask.MergeType.APPEND, sequenceStmt.getSequenceColName());
- Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc",
routineLoadDesc);
+ Deencapsulation.setField(createRoutineLoadInfo, "routineLoadDesc",
routineLoadDesc);
List<Pair<Integer, Long>> partitionIdToOffset = Lists.newArrayList();
List<PartitionInfo> kafkaPartitionInfoList = Lists.newArrayList();
for (String s : kafkaPartitionString.split(",")) {
@@ -261,7 +266,7 @@ public class KafkaRoutineLoadJobTest {
dsProperties.setKafkaPartitionOffsets(partitionIdToOffset);
Deencapsulation.setField(dsProperties, "brokerList", serverAddress);
Deencapsulation.setField(dsProperties, "topic", topicName);
- Deencapsulation.setField(createRoutineLoadStmt,
"dataSourceProperties", dsProperties);
+ Deencapsulation.setField(createRoutineLoadInfo,
"dataSourceProperties", dsProperties);
long dbId = 1L;
long tableId = 2L;
@@ -305,7 +310,7 @@ public class KafkaRoutineLoadJobTest {
}
};
- KafkaRoutineLoadJob kafkaRoutineLoadJob =
KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
+ KafkaRoutineLoadJob kafkaRoutineLoadJob =
KafkaRoutineLoadJob.fromCreateInfo(createRoutineLoadInfo, connectContext);
Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName());
Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId());
Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId());
@@ -316,12 +321,9 @@ public class KafkaRoutineLoadJobTest {
Assert.assertEquals(sequenceStmt.getSequenceColName(),
kafkaRoutineLoadJob.getSequenceCol());
}
- private CreateRoutineLoadStmt initCreateRoutineLoadStmt() {
- List<ParseNode> loadPropertyList = new ArrayList<>();
- loadPropertyList.add(columnSeparator);
- loadPropertyList.add(partitionNames);
+ private CreateRoutineLoadInfo initCreateRoutineLoadInfo() {
Map<String, String> properties = Maps.newHashMap();
-
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
+
properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
String typeName = LoadDataSourceType.KAFKA.name();
Map<String, String> customProperties = Maps.newHashMap();
@@ -329,11 +331,11 @@ public class KafkaRoutineLoadJobTest {
customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(),
serverAddress);
customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(),
kafkaPartitionString);
- CreateRoutineLoadStmt createRoutineLoadStmt = new
CreateRoutineLoadStmt(labelName, tableNameString,
-
loadPropertyList, properties,
-
typeName, customProperties,
-
LoadTask.MergeType.APPEND, "");
- Deencapsulation.setField(createRoutineLoadStmt, "name", jobName);
- return createRoutineLoadStmt;
+ LoadSeparator loadSeparator = new LoadSeparator(",");
+ Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
+ loadPropertyMap.put(loadSeparator.getClass().getName(), loadSeparator);
+ CreateRoutineLoadInfo createRoutineLoadInfo = new
CreateRoutineLoadInfo(labelNameInfo, tableNameString,
+ loadPropertyMap, properties, typeName, customProperties,
LoadTask.MergeType.APPEND, "");
+ return createRoutineLoadInfo;
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index dd38d2fbbe7..e86d10d0c8c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -17,8 +17,6 @@
package org.apache.doris.load.routineload;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
-import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.ParseNode;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.UserIdentity;
@@ -40,13 +38,16 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
+import org.apache.doris.nereids.trees.plans.commands.info.LabelNameInfo;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadProperty;
+import org.apache.doris.nereids.trees.plans.commands.load.LoadSeparator;
import
org.apache.doris.nereids.trees.plans.commands.load.PauseRoutineLoadCommand;
import
org.apache.doris.nereids.trees.plans.commands.load.ResumeRoutineLoadCommand;
import
org.apache.doris.nereids.trees.plans.commands.load.StopRoutineLoadCommand;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.OriginStatement;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
@@ -55,7 +56,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
-import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.logging.log4j.LogManager;
@@ -65,6 +65,7 @@ import org.junit.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -76,75 +77,6 @@ public class RoutineLoadManagerTest {
@Mocked
private SystemInfoService systemInfoService;
- @Test
- public void testAddJobByStmt(@Injectable AccessControllerManager
accessManager,
- @Injectable TResourceInfo tResourceInfo,
- @Mocked ConnectContext connectContext,
- @Mocked Env env) throws UserException {
- String jobName = "job1";
- String dbName = "db1";
- LabelName labelName = new LabelName(dbName, jobName);
- String tableNameString = "table1";
- List<ParseNode> loadPropertyList = new ArrayList<>();
- Separator columnSeparator = new Separator(",");
- loadPropertyList.add(columnSeparator);
- Map<String, String> properties = Maps.newHashMap();
-
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
- String typeName = LoadDataSourceType.KAFKA.name();
- Map<String, String> customProperties = Maps.newHashMap();
- String topicName = "topic1";
- customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(),
topicName);
- String serverAddress = "http://127.0.0.1:8080";
- customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(),
serverAddress);
- CreateRoutineLoadStmt createRoutineLoadStmt = new
CreateRoutineLoadStmt(labelName, tableNameString,
-
loadPropertyList, properties,
-
typeName, customProperties,
-
LoadTask.MergeType.APPEND, "");
- createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
-
- KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L,
jobName, 1L, 1L,
- serverAddress, topicName, UserIdentity.ADMIN);
-
- new MockUp<KafkaRoutineLoadJob>() {
- @Mock
- public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt
stmt) {
- return kafkaRoutineLoadJob;
- }
- };
-
- new Expectations() {
- {
- env.getAccessManager();
- minTimes = 0;
- result = accessManager;
- accessManager.checkTblPriv((ConnectContext) any, anyString,
anyString, anyString, PrivPredicate.LOAD);
- minTimes = 0;
- result = true;
- }
- };
- RoutineLoadManager routineLoadManager = new RoutineLoadManager();
- routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
-
- Map<String, RoutineLoadJob> idToRoutineLoadJob =
- Deencapsulation.getField(routineLoadManager,
"idToRoutineLoadJob");
- Assert.assertEquals(1, idToRoutineLoadJob.size());
- RoutineLoadJob routineLoadJob =
idToRoutineLoadJob.values().iterator().next();
- Assert.assertEquals(1L, routineLoadJob.getDbId());
- Assert.assertEquals(jobName, routineLoadJob.getName());
- Assert.assertEquals(1L, routineLoadJob.getTableId());
- Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE,
routineLoadJob.getState());
- Assert.assertEquals(true, routineLoadJob instanceof
KafkaRoutineLoadJob);
-
- Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob =
- Deencapsulation.getField(routineLoadManager,
"dbToNameToRoutineLoadJob");
- Assert.assertEquals(1, dbToNameToRoutineLoadJob.size());
- Assert.assertEquals(Long.valueOf(1L),
dbToNameToRoutineLoadJob.keySet().iterator().next());
- Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob =
dbToNameToRoutineLoadJob.get(1L);
- Assert.assertEquals(jobName,
nameToRoutineLoadJob.keySet().iterator().next());
- Assert.assertEquals(1, nameToRoutineLoadJob.values().size());
- Assert.assertEquals(routineLoadJob,
nameToRoutineLoadJob.values().iterator().next().get(0));
- }
-
@Test
public void testCreateJobAuthDeny(@Injectable AccessControllerManager
accessManager,
@Injectable TResourceInfo tResourceInfo,
@@ -152,25 +84,24 @@ public class RoutineLoadManagerTest {
@Mocked Env env) {
String jobName = "job1";
String dbName = "db1";
- LabelName labelName = new LabelName(dbName, jobName);
+ LabelNameInfo labelNameInfo = new LabelNameInfo(dbName, jobName);
String tableNameString = "table1";
List<ParseNode> loadPropertyList = new ArrayList<>();
Separator columnSeparator = new Separator(",");
loadPropertyList.add(columnSeparator);
Map<String, String> properties = Maps.newHashMap();
-
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
+
properties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
String typeName = LoadDataSourceType.KAFKA.name();
Map<String, String> customProperties = Maps.newHashMap();
String topicName = "topic1";
customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(),
topicName);
String serverAddress = "http://127.0.0.1:8080";
customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(),
serverAddress);
- CreateRoutineLoadStmt createRoutineLoadStmt = new
CreateRoutineLoadStmt(labelName, tableNameString,
-
loadPropertyList, properties,
-
typeName, customProperties,
-
LoadTask.MergeType.APPEND, "");
- createRoutineLoadStmt.setOrigStmt(new OriginStatement("dummy", 0));
-
+ LoadSeparator loadSeparator = new LoadSeparator(",");
+ Map<String, LoadProperty> loadPropertyMap = new HashMap<>();
+ loadPropertyMap.put(loadSeparator.getClass().getName(), loadSeparator);
+ CreateRoutineLoadInfo createRoutineLoadInfo = new
CreateRoutineLoadInfo(labelNameInfo, tableNameString,
+ loadPropertyMap, properties, typeName, customProperties,
LoadTask.MergeType.APPEND, "");
new Expectations() {
{
@@ -184,7 +115,8 @@ public class RoutineLoadManagerTest {
};
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
try {
- routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt);
+ createRoutineLoadInfo.checkJobProperties();
+ routineLoadManager.createRoutineLoadJob(createRoutineLoadInfo,
connectContext);
Assert.fail();
} catch (LoadException | DdlException e) {
Assert.fail();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
index a93b4b3a3d2..8a1550d48f5 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java
@@ -17,11 +17,11 @@
package org.apache.doris.persist;
-import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
import com.google.common.collect.Maps;
import org.junit.Assert;
@@ -48,7 +48,7 @@ public class AlterRoutineLoadOperationLogTest {
long jobId = 1000;
Map<String, String> jobProperties = Maps.newHashMap();
-
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
"5");
+
jobProperties.put(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY,
"5");
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "0, 1");
@@ -71,7 +71,7 @@ public class AlterRoutineLoadOperationLogTest {
AlterRoutineLoadJobOperationLog log2 =
AlterRoutineLoadJobOperationLog.read(in);
Assert.assertEquals(1, log2.getJobProperties().size());
- Assert.assertEquals("5",
log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
+ Assert.assertEquals("5",
log2.getJobProperties().get(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
KafkaDataSourceProperties kafkaDataSourceProperties =
(KafkaDataSourceProperties) log2.getDataSourceProperties();
Assert.assertEquals(null, kafkaDataSourceProperties.getBrokerList());
Assert.assertEquals(null, kafkaDataSourceProperties.getTopic());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]