This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fd899995be028c060fa016b7ae029fe07edf4e7f
Author: Calvin Kirs <acm_mas...@163.com>
AuthorDate: Thu Aug 10 17:41:53 2023 +0800

    [Feature](Routine Load)Support Partial Update (#22785)
---
 .../Load/ALTER-ROUTINE-LOAD.md                     |  1 +
 .../Load/CREATE-ROUTINE-LOAD.md                    |  3 +++
 .../Load/ALTER-ROUTINE-LOAD.md                     |  1 +
 .../Load/CREATE-ROUTINE-LOAD.md                    |  3 +++
 .../doris/analysis/AlterRoutineLoadStmt.java       | 29 ++++++++++++++++++++++
 .../doris/analysis/CreateRoutineLoadStmt.java      | 27 ++++++++++++++++----
 .../load/routineload/KafkaRoutineLoadJob.java      |  4 +++
 .../doris/load/routineload/RoutineLoadJob.java     | 15 ++++++++++-
 8 files changed, 77 insertions(+), 6 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
index 3a9d4d79cf..0e45d21245 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
@@ -69,6 +69,7 @@ FROM data_source
    10. `timezone`
    11. `num_as_string`
    12. `fuzzy_parse`
+   13. `partial_columns`
 
 
 4. `data_source`
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index 3d0bb77a0c..7acd012770 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -249,6 +249,9 @@ FROM data_source [data_source_properties]
   11. `load_to_single_tablet`
       Boolean type, True means that one task can only load data to one tablet 
in the corresponding partition at a time. The default value is false. This 
parameter can only be set when loading data into the OLAP table with random 
partition.
 
+  12. `partial_columns`
+      Boolean type, True means that use partial column update, the default 
value is false, this parameter is only allowed to be set when the table model 
is Unique and Merge on Write is used. Multi-table does not support this 
parameter.
+  
 - `FROM data_source [data_source_properties]`
 
   The type of data source. Currently supports:
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
index fbf1e57969..b52ce004d7 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD.md
@@ -69,6 +69,7 @@ FROM data_source
     10. `timezone`
     11. `num_as_string`
     12. `fuzzy_parse`
+    13. `partial_columns`
 
 
 4. `data_source`
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
index 8f0f034d32..1206c53305 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CREATE-ROUTINE-LOAD.md
@@ -248,6 +248,9 @@ FROM data_source [data_source_properties]
 
       布尔类型,为 true 表示支持一个任务只导入数据到对应分区的一个 tablet,默认值为 false,该参数只允许在对带有 random 
分区的 olap 表导数的时候设置。
 
+  12. `partial_columns`
+      布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 
时设置。一流多表不支持此参数。
+
 - `FROM data_source [data_source_properties]`
 
   数据源的类型。当前支持:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 064dc52bc5..7da268dcf2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -17,7 +17,10 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
@@ -59,6 +62,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)
             .add(CreateRoutineLoadStmt.NUM_AS_STRING)
             .add(CreateRoutineLoadStmt.FUZZY_PARSE)
+            .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
             .build();
@@ -67,6 +71,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
     private final Map<String, String> jobProperties;
     private final Map<String, String> dataSourceMapProperties;
 
+    private boolean isPartialUpdate;
+
     // save analyzed job properties.
     // analyzed data source properties are saved in dataSourceProperties.
     private Map<String, String> analyzedJobProperties = Maps.newHashMap();
@@ -76,6 +82,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
         this.labelName = labelName;
         this.jobProperties = jobProperties != null ? jobProperties : 
Maps.newHashMap();
         this.dataSourceMapProperties = dataSourceProperties != null ? 
dataSourceProperties : Maps.newHashMap();
+        this.isPartialUpdate = 
this.jobProperties.getOrDefault(CreateRoutineLoadStmt.PARTIAL_COLUMNS, "false")
+                .equalsIgnoreCase("true");
     }
 
     public String getDbName() {
@@ -111,12 +119,29 @@ public class AlterRoutineLoadStmt extends DdlStmt {
         checkJobProperties();
         // check data source properties
         checkDataSourceProperties();
+        checkPartialUpdate();
 
         if (analyzedJobProperties.isEmpty() && 
MapUtils.isEmpty(dataSourceMapProperties)) {
             throw new AnalysisException("No properties are specified");
         }
     }
 
+    private void checkPartialUpdate() throws UserException {
+        if (!isPartialUpdate) {
+            return;
+        }
+        RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
+                .getJob(getDbName(), getLabel());
+        if (job.isMultiTable()) {
+            throw new AnalysisException("load by PARTIAL_COLUMNS is not 
supported in multi-table load.");
+        }
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(job.getDbFullName());
+        Table table = db.getTableOrAnalysisException(job.getTableName());
+        if (isPartialUpdate && !((OlapTable) 
table).getEnableUniqueKeyMergeOnWrite()) {
+            throw new AnalysisException("load by PARTIAL_COLUMNS is only 
supported in unique table MoW");
+        }
+    }
+
     private void checkJobProperties() throws UserException {
         Optional<String> optional = jobProperties.keySet().stream().filter(
                 entity -> 
!CONFIGURABLE_JOB_PROPERTIES_SET.contains(entity)).findFirst();
@@ -203,6 +228,10 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             boolean fuzzyParse = 
Boolean.parseBoolean(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE));
             analyzedJobProperties.put(CreateRoutineLoadStmt.FUZZY_PARSE, 
String.valueOf(fuzzyParse));
         }
+        if (jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+            analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
+                    String.valueOf(isPartialUpdate));
+        }
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index e8144cd00b..b1fb17c58e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -38,6 +38,7 @@ import org.apache.doris.qe.ConnectContext;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -106,6 +107,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     public static final String NUM_AS_STRING = "num_as_string";
     public static final String FUZZY_PARSE = "fuzzy_parse";
 
+    public static final String PARTIAL_COLUMNS = "partial_columns";
+
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
@@ -131,6 +134,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(EXEC_MEM_LIMIT_PROPERTY)
             .add(SEND_BATCH_PARALLELISM)
             .add(LOAD_TO_SINGLE_TABLET)
+            .add(PARTIAL_COLUMNS)
             .build();
 
     private final LabelName labelName;
@@ -157,8 +161,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     /**
      * RoutineLoad support json data.
      * Require Params:
-     *   1) dataFormat = "json"
-     *   2) jsonPaths = "$.XXX.xxx"
+     * 1) dataFormat = "json"
+     * 2) jsonPaths = "$.XXX.xxx"
      */
     private String format = ""; //default is csv.
     private String jsonPaths = "";
@@ -167,6 +171,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private boolean numAsString = false;
     private boolean fuzzyParse = false;
 
+    /**
+     * support partial columns load(Only Unique Key Columns)
+     */
+    @Getter
+    private boolean isPartialUpdate = false;
+
     private String comment = "";
 
     private LoadTask.MergeType mergeType;
@@ -196,6 +206,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
                 .createDataSource(typeName, dataSourceProperties, 
this.isMultiTable);
         this.mergeType = mergeType;
+        this.isPartialUpdate = 
this.jobProperties.getOrDefault(PARTIAL_COLUMNS, 
"false").equalsIgnoreCase("true");
         if (comment != null) {
             this.comment = comment;
         }
@@ -323,6 +334,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         dbName = labelName.getDbName();
         name = labelName.getLabelName();
         Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
+        if (isPartialUpdate && isMultiTable) {
+            throw new AnalysisException("Partial update is not supported in 
multi-table load.");
+        }
         if (isMultiTable) {
             return;
         }
@@ -339,6 +353,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                 && !(table.getType() == Table.TableType.OLAP && ((OlapTable) 
table).hasDeleteSign())) {
             throw new AnalysisException("load by MERGE or DELETE need to 
upgrade table to support batch delete.");
         }
+        if (isPartialUpdate && !((OlapTable) 
table).getEnableUniqueKeyMergeOnWrite()) {
+            throw new AnalysisException("load by PARTIAL_COLUMNS is only 
supported in unique table MoW");
+        }
     }
 
     public void checkLoadProperties() throws UserException {
@@ -409,9 +426,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             }
         }
         routineLoadDesc = new RoutineLoadDesc(columnSeparator, lineDelimiter, 
importColumnsStmt,
-                        precedingImportWhereStmt, importWhereStmt,
-                        partitionNames, importDeleteOnStmt == null ? null : 
importDeleteOnStmt.getExpr(), mergeType,
-                        importSequenceStmt == null ? null : 
importSequenceStmt.getSequenceColName());
+                precedingImportWhereStmt, importWhereStmt,
+                partitionNames, importDeleteOnStmt == null ? null : 
importDeleteOnStmt.getExpr(), mergeType,
+                importSequenceStmt == null ? null : 
importSequenceStmt.getSequenceColName());
     }
 
     private void checkJobProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 0d6cb534cf..43cd058589 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -53,6 +53,7 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -683,6 +684,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             Map<String, String> copiedJobProperties = 
Maps.newHashMap(jobProperties);
             modifyCommonJobProperties(copiedJobProperties);
             this.jobProperties.putAll(copiedJobProperties);
+            if 
(jobProperties.containsKey(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+                this.isPartialUpdate = 
BooleanUtils.toBoolean(jobProperties.get(CreateRoutineLoadStmt.PARTIAL_COLUMNS));
+            }
         }
         LOG.info("modify the properties of kafka routine load job: {}, 
jobProperties: {}, datasource properties: {}",
                 this.id, jobProperties, dataSourceProperties);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 56b3cc2c01..9355dd7250 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -196,6 +196,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS;
     protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE;
 
+    protected boolean isPartialUpdate = false;
+
     protected String sequenceCol;
 
     /**
@@ -339,6 +341,10 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 
String.valueOf(this.execMemLimit));
         jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 
String.valueOf(this.sendBatchParallelism));
         jobProperties.put(LoadStmt.LOAD_TO_SINGLE_TABLET, 
String.valueOf(this.loadToSingleTablet));
+        jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
stmt.isPartialUpdate() ? "true" : "false");
+        if (stmt.isPartialUpdate()) {
+            this.isPartialUpdate = true;
+        }
 
         if (Strings.isNullOrEmpty(stmt.getFormat()) || 
stmt.getFormat().equals("csv")) {
             jobProperties.put(PROPS_FORMAT, "csv");
@@ -636,7 +642,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     @Override
     public boolean isPartialUpdate() {
-        return false;
+        return isPartialUpdate;
     }
 
     @Override
@@ -1484,6 +1490,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, 
maxBatchRows, false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, 
maxBatchSizeBytes, false);
         appendProperties(sb, PROPS_FORMAT, getFormat(), false);
+        if (isPartialUpdate) {
+            appendProperties(sb, CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
isPartialUpdate, false);
+        }
         appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
         appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), 
false);
         appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
@@ -1562,6 +1571,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             jobProperties.put("columnSeparator", columnSeparator == null ? 
"\t" : columnSeparator.toString());
             jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : 
lineDelimiter.toString());
         }
+        jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, 
String.valueOf(isPartialUpdate));
         jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum));
         jobProperties.put("maxBatchIntervalS", 
String.valueOf(maxBatchIntervalS));
         jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows));
@@ -1712,6 +1722,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             String key = Text.readString(in);
             String value = Text.readString(in);
             jobProperties.put(key, value);
+            if (key.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) {
+                isPartialUpdate = Boolean.parseBoolean(value);
+            }
         }
 
         size = in.readInt();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to