dataroaring commented on code in PR #39756: URL: https://github.com/apache/doris/pull/39756#discussion_r1775069398
########## be/src/http/action/stream_load.cpp: ########## @@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_enable_profile(false); } } - if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { + + StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = { + {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, + {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, + {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; Review Comment: put inside if below. ########## be/src/http/action/stream_load.cpp: ########## @@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_enable_profile(false); } } - if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { + + StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = { + {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, + {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, + {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; + if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { + std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); + auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); + if (iter != unique_key_update_mode_map.end()) { + TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; + if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { + // check constraints when flexible partial update is enabled + if (ctx->format != TFileFormatType::FORMAT_JSON) { + return Status::InvalidArgument( + "flexible partial update only support json format as input file " + "currently"); + } + if (!http_req->header(HTTP_FUZZY_PARSE).empty() && + iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { + return Status::InvalidArgument( + "Don't support flexible partial update when fuzzy_parse is enabled"); + } + if (!http_req->header(HTTP_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when columns is specified"); + } + if (!http_req->header(HTTP_JSONPATHS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when jsonpaths is specified"); + } + if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when hidden_columns is " + "specified"); + } + if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "function_column.sequence_col is specified"); + } + if (!http_req->header(HTTP_MERGE_TYPE).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "merge_type is specified"); + } + } + request.__set_unique_key_update_mode(unique_key_update_mode); + } else { + return Status::InvalidArgument( + "Invalid unique_key_partial_mode {}, must be UPSERT, PARTIAL_UPDATE or " + "FLEXIBLE_PARTIAL_UPDATE", Review Comment: PARTIAL_UPDATE -> UPDATE_FIXED_COLUMNS ########## fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java: ########## @@ -3559,6 +3559,12 @@ private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder sb.append(olapTable.getEnableUniqueKeyMergeOnWrite()).append("\""); } + // enable_unique_key_skip_bitmap, always print this property for merge-on-write unique table + if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()) { + sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_SKIP_BITMAP_COLUMN).append("\" = \""); + sb.append(olapTable.getEnableUniqueKeySkipBitmap()).append("\""); + } Review Comment: We should print it only if it is true. ########## fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java: ########## @@ -126,8 +127,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde && !destTable.hasDeleteSign()) { throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete."); } - - if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && destTable.getSequenceMapCol() == null) { + TUniqueKeyUpdateMode uniquekeyUpdateMode = taskInfo.getUniqueKeyUpdateMode(); + if (uniquekeyUpdateMode != TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS + && destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && destTable.getSequenceMapCol() == null) { throw new UserException("Table " + destTable.getName() + " has sequence column, need to specify the sequence column"); } Review Comment: We should rethink here, why we handle fixed partial update and flexible partial update differently. ########## be/src/http/action/stream_load.cpp: ########## @@ -623,13 +623,70 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_enable_profile(false); } } - if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { + + StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = { + {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, + {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, + {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; + if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { + std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); + auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); + if (iter != unique_key_update_mode_map.end()) { + TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; + if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { + // check constraints when flexible partial update is enabled + if (ctx->format != TFileFormatType::FORMAT_JSON) { + return Status::InvalidArgument( + "flexible partial update only support json format as input file " + "currently"); + } + if (!http_req->header(HTTP_FUZZY_PARSE).empty() && + iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { + return Status::InvalidArgument( + "Don't support flexible partial update when fuzzy_parse is enabled"); + } + if (!http_req->header(HTTP_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when columns is specified"); + } + if (!http_req->header(HTTP_JSONPATHS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when jsonpaths is specified"); + } + if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when hidden_columns is " + "specified"); + } + if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "function_column.sequence_col is specified"); + } + if (!http_req->header(HTTP_MERGE_TYPE).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when " + "merge_type is specified"); + } + } + request.__set_unique_key_update_mode(unique_key_update_mode); + } else { + return Status::InvalidArgument( + "Invalid unique_key_partial_mode {}, must be UPSERT, PARTIAL_UPDATE or " + "FLEXIBLE_PARTIAL_UPDATE", Review Comment: UPDATE_FLEXIBLE_COLUMNS ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java: ########## @@ -704,6 +705,14 @@ public static ColumnDefinition newVersionColumnDefinition(AggregateType aggregat Optional.of(new DefaultValue(DefaultValue.ZERO_NUMBER)), "doris version hidden column", false); } + // used in CreateTableInfo.validate(), specify the default value as DefaultValue.NULL_DEFAULT_VALUE + // becasue ColumnDefinition.validate() will check that bitmap type column don't set default value + // and then set the default value of that column to bitmap_empty() + public static ColumnDefinition newSkipBitmapColumnDef(AggregateType aggregateType) { + return new ColumnDefinition(Column.SKIP_BITMAP_COL, BitmapType.INSTANCE, false, aggregateType, false, + Optional.of(DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE), "doris skip bitmap hidden column", false); + } Review Comment: We had support EMPTY_BITMAP. ########## fe/fe-common/src/main/java/org/apache/doris/common/Config.java: ########## @@ -1422,6 +1422,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean enable_hidden_version_column_by_default = true; + /** + * Whether to add a skip bitmap column when create merge-on-write unique table + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean enable_skip_bitmap_column_by_default = true; Review Comment: We set it false now. ########## be/src/olap/memtable.cpp: ########## @@ -293,6 +331,12 @@ size_t MemTable::_sort() { auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func); _last_sorted_pos = _row_in_blocks.size(); + { + std::string res; + for (const auto& row : _row_in_blocks) { + res += fmt::format(",{}", row->_row_pos); + } + } Review Comment: useless code? ########## fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java: ########## @@ -163,8 +166,21 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde } } + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && !destTable.hasSkipBitmapColumn()) { + throw new UserException("Flexible partial update can only support table with skip bitmap hidden column." + + " But table " + destTable.getName() + " doesn't have it"); Review Comment: Provide hint to user how to add hidden column. ########## be/src/vec/exec/format/json/new_json_reader.cpp: ########## @@ -774,10 +782,34 @@ Status NewJsonReader::_set_column_value(rapidjson::Value& objectValue, Block& bl } has_valid_value = true; } else { - // not found, filling with default value - RETURN_IF_ERROR(_fill_missing_column(slot_desc, column_ptr, valid)); - if (!(*valid)) { - return Status::OK(); + if (_should_process_skip_bitmap_col()) { + // not found, skip this column in flexible partial update + if (slot_desc->is_key() && !slot_desc->is_auto_increment()) { + RETURN_IF_ERROR( + _append_error_msg(objectValue, + "The key columns can not be ommited in flexible " + "partial update, missing key column: {}", + slot_desc->col_name(), valid)); + // remove this line in block + for (int i = 0; i < block.columns(); ++i) { + auto column = block.get_by_position(i).column->assume_mutable(); + if (column->size() != cur_row_count) { + DCHECK(column->size() == cur_row_count + 1); + column->pop_back(1); + DCHECK(column->size() == cur_row_count); + } + } Review Comment: Should follow max_filter_ratio ########## fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java: ########## @@ -238,7 +248,15 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { } strBuilder.append(prefix + " TUPLE ID: " + tupleDescriptor.getId() + "\n"); strBuilder.append(prefix + " " + DataPartition.RANDOM.getExplainString(explainLevel)); + boolean isPartialUpdate = uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT; strBuilder.append(prefix + " IS_PARTIAL_UPDATE: " + isPartialUpdate); + if (isPartialUpdate) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { + strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: FIXED_PARTIAL_UPDATE"); + } else { + strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: FLEXIBLE_PARTIAL_UPDATE"); Review Comment: Should be same as option in streamload. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org