This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d9bd1fad67a [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload 
(#26383)
d9bd1fad67a is described below

commit d9bd1fad67af08d4d8027035e39855535de4a433
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Mon Nov 6 09:56:46 2023 +0800

    [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload (#26383)
    
    When a Unique key table carries the `enable_unique_key_merge_on_write` 
attribute, the value of the agg type is none. Therefore, when doing sparkload, 
we need to specify the agg type as `REPLACE`.
---
 .../org/apache/doris/load/loadv2/SparkLoadPendingTask.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 8e9599f774b..32749fd8a77 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -247,10 +247,13 @@ public class SparkLoadPendingTask extends LoadTask {
             long indexId = entry.getKey();
             int schemaHash = table.getSchemaHashByIndexId(indexId);
 
+            boolean changeAggType = 
table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
+                    && 
table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
+
             // columns
             List<EtlColumn> etlColumns = Lists.newArrayList();
             for (Column column : entry.getValue()) {
-                etlColumns.add(createEtlColumn(column));
+                etlColumns.add(createEtlColumn(column, changeAggType));
             }
 
             // check distribution type
@@ -290,7 +293,7 @@ public class SparkLoadPendingTask extends LoadTask {
         return etlIndexes;
     }
 
-    private EtlColumn createEtlColumn(Column column) {
+    private EtlColumn createEtlColumn(Column column, boolean changeAggType) {
         // column name
         String name = column.getName().toLowerCase(Locale.ROOT);
         // column type
@@ -304,7 +307,11 @@ public class SparkLoadPendingTask extends LoadTask {
         // aggregation type
         String aggregationType = null;
         if (column.getAggregationType() != null) {
-            aggregationType = column.getAggregationType().toString();
+            if (changeAggType && !column.isKey()) {
+                aggregationType = AggregateType.REPLACE.toSql();
+            } else {
+                aggregationType = column.getAggregationType().toString();
+            }
         }
 
         // default value


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to