dataroaring commented on code in PR #39756:
URL: https://github.com/apache/doris/pull/39756#discussion_r1775069398


##########
be/src/http/action/stream_load.cpp:
##########
@@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
             request.__set_enable_profile(false);
         }
     }
-    if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
+
+    StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
+            {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
+            {"UPDATE_FIXED_COLUMNS", 
TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
+            {"UPDATE_FLEXIBLE_COLUMNS", 
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};

Review Comment:
   put inside if below.



##########
be/src/http/action/stream_load.cpp:
##########
@@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
             request.__set_enable_profile(false);
         }
     }
-    if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
+
+    StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
+            {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
+            {"UPDATE_FIXED_COLUMNS", 
TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
+            {"UPDATE_FLEXIBLE_COLUMNS", 
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
+    if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
+        std::string unique_key_update_mode_str = 
http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
+        auto iter = 
unique_key_update_mode_map.find(unique_key_update_mode_str);
+        if (iter != unique_key_update_mode_map.end()) {
+            TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
+            if (unique_key_update_mode == 
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
+                // check constraints when flexible partial update is enabled
+                if (ctx->format != TFileFormatType::FORMAT_JSON) {
+                    return Status::InvalidArgument(
+                            "flexible partial update only support json format 
as input file "
+                            "currently");
+                }
+                if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
+                    iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
fuzzy_parse is enabled");
+                }
+                if (!http_req->header(HTTP_COLUMNS).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
columns is specified");
+                }
+                if (!http_req->header(HTTP_JSONPATHS).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
jsonpaths is specified");
+                }
+                if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
hidden_columns is "
+                            "specified");
+                }
+                if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when "
+                            "function_column.sequence_col is specified");
+                }
+                if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when "
+                            "merge_type is specified");
+                }
+            }
+            request.__set_unique_key_update_mode(unique_key_update_mode);
+        } else {
+            return Status::InvalidArgument(
+                    "Invalid unique_key_partial_mode {}, must be UPSERT, 
PARTIAL_UPDATE or "
+                    "FLEXIBLE_PARTIAL_UPDATE",

Review Comment:
   PARTIAL_UPDATE -> UPDATE_FIXED_COLUMNS



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -3559,6 +3559,12 @@ private static void addOlapTablePropertyInfo(OlapTable 
olapTable, StringBuilder
             sb.append(olapTable.getEnableUniqueKeyMergeOnWrite()).append("\"");
         }
 
+        // enable_unique_key_skip_bitmap, always print this property for 
merge-on-write unique table
+        if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && 
olapTable.getEnableUniqueKeyMergeOnWrite()) {
+            
sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_SKIP_BITMAP_COLUMN).append("\"
 = \"");
+            sb.append(olapTable.getEnableUniqueKeySkipBitmap()).append("\"");
+        }

Review Comment:
   We should print it only if it is true.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java:
##########
@@ -126,8 +127,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int 
fragmentInstanceIdInde
                 && !destTable.hasDeleteSign()) {
             throw new AnalysisException("load by MERGE or DELETE need to 
upgrade table to support batch delete.");
         }
-
-        if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && 
destTable.getSequenceMapCol() == null) {
+        TUniqueKeyUpdateMode uniquekeyUpdateMode = 
taskInfo.getUniqueKeyUpdateMode();
+        if (uniquekeyUpdateMode != TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
+                && destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && 
destTable.getSequenceMapCol() == null) {
             throw new UserException("Table " + destTable.getName()
                     + " has sequence column, need to specify the sequence 
column");
         }

Review Comment:
   We should rethink here, why we handle fixed partial update and flexible 
partial update differently.



##########
be/src/http/action/stream_load.cpp:
##########
@@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
             request.__set_enable_profile(false);
         }
     }
-    if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
+
+    StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
+            {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
+            {"UPDATE_FIXED_COLUMNS", 
TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
+            {"UPDATE_FLEXIBLE_COLUMNS", 
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
+    if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
+        std::string unique_key_update_mode_str = 
http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
+        auto iter = 
unique_key_update_mode_map.find(unique_key_update_mode_str);
+        if (iter != unique_key_update_mode_map.end()) {
+            TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
+            if (unique_key_update_mode == 
TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
+                // check constraints when flexible partial update is enabled
+                if (ctx->format != TFileFormatType::FORMAT_JSON) {
+                    return Status::InvalidArgument(
+                            "flexible partial update only support json format 
as input file "
+                            "currently");
+                }
+                if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
+                    iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
fuzzy_parse is enabled");
+                }
+                if (!http_req->header(HTTP_COLUMNS).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
columns is specified");
+                }
+                if (!http_req->header(HTTP_JSONPATHS).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
jsonpaths is specified");
+                }
+                if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when 
hidden_columns is "
+                            "specified");
+                }
+                if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when "
+                            "function_column.sequence_col is specified");
+                }
+                if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
+                    return Status::InvalidArgument(
+                            "Don't support flexible partial update when "
+                            "merge_type is specified");
+                }
+            }
+            request.__set_unique_key_update_mode(unique_key_update_mode);
+        } else {
+            return Status::InvalidArgument(
+                    "Invalid unique_key_partial_mode {}, must be UPSERT, 
PARTIAL_UPDATE or "
+                    "FLEXIBLE_PARTIAL_UPDATE",

Review Comment:
   UPDATE_FLEXIBLE_COLUMNS



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java:
##########
@@ -704,6 +705,14 @@ public static ColumnDefinition 
newVersionColumnDefinition(AggregateType aggregat
                 Optional.of(new DefaultValue(DefaultValue.ZERO_NUMBER)), 
"doris version hidden column", false);
     }
 
+    // used in CreateTableInfo.validate(), specify the default value as 
DefaultValue.NULL_DEFAULT_VALUE
+    // becasue ColumnDefinition.validate() will check that bitmap type column 
don't set default value
+    // and then set the default value of that column to bitmap_empty()
+    public static ColumnDefinition newSkipBitmapColumnDef(AggregateType 
aggregateType) {
+        return new ColumnDefinition(Column.SKIP_BITMAP_COL, 
BitmapType.INSTANCE, false, aggregateType, false,
+                Optional.of(DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE), "doris 
skip bitmap hidden column", false);
+    }

Review Comment:
   We had support EMPTY_BITMAP.



##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1422,6 +1422,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static boolean enable_hidden_version_column_by_default = true;
 
+    /**
+     * Whether to add a skip bitmap column when create merge-on-write unique 
table
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_skip_bitmap_column_by_default = true;

Review Comment:
   We set it false now.



##########
be/src/olap/memtable.cpp:
##########
@@ -293,6 +331,12 @@ size_t MemTable::_sort() {
     auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
     std::inplace_merge(_row_in_blocks.begin(), new_row_it, 
_row_in_blocks.end(), cmp_func);
     _last_sorted_pos = _row_in_blocks.size();
+    {
+        std::string res;
+        for (const auto& row : _row_in_blocks) {
+            res += fmt::format(",{}", row->_row_pos);
+        }
+    }

Review Comment:
   useless code?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java:
##########
@@ -163,8 +166,21 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int 
fragmentInstanceIdInde
             }
         }
 
+        if (uniquekeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && 
!destTable.hasSkipBitmapColumn()) {
+            throw new UserException("Flexible partial update can only support 
table with skip bitmap hidden column."
+                    + " But table " + destTable.getName() + " doesn't have 
it");

Review Comment:
   Provide hint to user how to add hidden column.



##########
be/src/vec/exec/format/json/new_json_reader.cpp:
##########
@@ -774,10 +782,34 @@ Status NewJsonReader::_set_column_value(rapidjson::Value& 
objectValue, Block& bl
             }
             has_valid_value = true;
         } else {
-            // not found, filling with default value
-            RETURN_IF_ERROR(_fill_missing_column(slot_desc, column_ptr, 
valid));
-            if (!(*valid)) {
-                return Status::OK();
+            if (_should_process_skip_bitmap_col()) {
+                // not found, skip this column in flexible partial update
+                if (slot_desc->is_key() && !slot_desc->is_auto_increment()) {
+                    RETURN_IF_ERROR(
+                            _append_error_msg(objectValue,
+                                              "The key columns can not be 
ommited in flexible "
+                                              "partial update, missing key 
column: {}",
+                                              slot_desc->col_name(), valid));
+                    // remove this line in block
+                    for (int i = 0; i < block.columns(); ++i) {
+                        auto column = 
block.get_by_position(i).column->assume_mutable();
+                        if (column->size() != cur_row_count) {
+                            DCHECK(column->size() == cur_row_count + 1);
+                            column->pop_back(1);
+                            DCHECK(column->size() == cur_row_count);
+                        }
+                    }

Review Comment:
   Should follow max_filter_ratio



##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java:
##########
@@ -238,7 +248,15 @@ public String getExplainString(String prefix, 
TExplainLevel explainLevel) {
         }
         strBuilder.append(prefix + "  TUPLE ID: " + tupleDescriptor.getId() + 
"\n");
         strBuilder.append(prefix + "  " + 
DataPartition.RANDOM.getExplainString(explainLevel));
+        boolean isPartialUpdate = uniqueKeyUpdateMode != 
TUniqueKeyUpdateMode.UPSERT;
         strBuilder.append(prefix + "  IS_PARTIAL_UPDATE: " + isPartialUpdate);
+        if (isPartialUpdate) {
+            if (uniqueKeyUpdateMode == 
TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
+                strBuilder.append(prefix + "  PARTIAL_UPDATE_MODE: 
FIXED_PARTIAL_UPDATE");
+            } else {
+                strBuilder.append(prefix + "  PARTIAL_UPDATE_MODE: 
FLEXIBLE_PARTIAL_UPDATE");

Review Comment:
   Should be same as option in streamload.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to