This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 57e02cd5e31f871bc762aee6dec14dc6cb69d851 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Fri Jun 14 09:27:09 2024 +0800 [fix](partial update) partial update should not read old fileds from rows with delete sign (#36210) Issue Number: close #34296 1. When partial update filling in the missing fields, if a load job previously wrote data with a delete sign, it will also read out the data in the column with the delete sign, so that the newly written data will also become invisible 2. This problem was fixed in #24877, but was introduced again in #26721, and was never found because the case was changed to the wrong output in #26721. 3. The fix in #24877 didn't take into account the handling of concurrent conflicts in the publish phase, the current PR adds this part of the handling, and adds the corresponding case. --- be/src/olap/base_tablet.cpp | 58 +++++++- be/src/olap/base_tablet.h | 4 +- be/src/olap/partial_update_info.h | 41 ++++++ be/src/olap/rowset/segment_v2/segment_writer.cpp | 27 +--- .../rowset/segment_v2/vertical_segment_writer.cpp | 27 +--- .../partial_update_parallel_with_delete_sign.csv | 5 + .../test_partial_update_delete_sign.out | 82 ++++++++++- ...st_partial_update_delete_sign_with_conflict.out | 19 +++ .../test_partial_update_delete_sign.groovy | 117 ++++++++-------- ...partial_update_delete_sign_with_conflict.groovy | 151 +++++++++++++++++++++ 10 files changed, 415 insertions(+), 116 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 89365e0f4e4..2fa90051165 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -798,8 +798,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, auto partial_update_info = rowset_writer->get_partial_update_info(); DCHECK(partial_update_info); RETURN_IF_ERROR(generate_new_block_for_partial_update( - rowset_schema, partial_update_info->missing_cids, partial_update_info->update_cids, - read_plan_ori, read_plan_update, rsid_to_rowset, &block)); + rowset_schema, partial_update_info.get(), read_plan_ori, read_plan_update, + rsid_to_rowset, &block)); RETURN_IF_ERROR(sort_block(block, ordered_block)); RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); if (new_generated_rows != rowset_writer->num_rows()) { @@ -928,9 +928,8 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t } Status BaseTablet::generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const std::vector<uint32>& missing_cids, - const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, - const PartialUpdateReadPlan& read_plan_update, + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block* output_block) { // do partial update related works @@ -940,6 +939,8 @@ Status BaseTablet::generate_new_block_for_partial_update( // 4. mark current keys deleted CHECK(output_block); auto full_mutable_columns = output_block->mutate_columns(); + const auto& missing_cids = partial_update_info->missing_cids; + const auto& update_cids = partial_update_info->update_cids; auto old_block = rowset_schema->create_block_by_cids(missing_cids); auto update_block = rowset_schema->create_block_by_cids(update_cids); @@ -951,10 +952,57 @@ Status BaseTablet::generate_new_block_for_partial_update( RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, rsid_to_rowset, update_block, &read_index_update)); + const vectorized::Int8* delete_sign_column_data = nullptr; + if (const vectorized::ColumnWithTypeAndName* delete_sign_column = + old_block.try_get_by_name(DELETE_SIGN); + delete_sign_column != nullptr) { + auto& delete_sign_col = + reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column)); + delete_sign_column_data = delete_sign_col.get_data().data(); + } + + // build default value block + auto default_value_block = old_block.clone_empty(); + auto mutable_default_value_columns = default_value_block.mutate_columns(); + if (delete_sign_column_data != nullptr) { + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& column = rowset_schema->column(missing_cids[i]); + if (column.has_default_value()) { + const auto& default_value = partial_update_info->default_values[i]; + vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), + default_value.size()); + RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string( + rb, mutable_default_value_columns[i].get())); + } + } + } + // build full block CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + const auto& rs_column = rowset_schema->column(missing_cids[i]); for (auto idx = 0; idx < read_index_old.size(); ++idx) { + // if the conflict update is a delete sign, which means that the key is + // not exist now, we should not read old values from the deleted data, + // and should use default value instead. + // NOTE: since now we are in the publishing phase, all data is commited + // before, even the `strict_mode` is true (which requires partial update + // load job can't insert new keys), this "new" key MUST be written into + // the new generated segment file. + if (delete_sign_column_data != nullptr && + delete_sign_column_data[read_index_old[idx]] != 0) { + auto& mutable_column = full_mutable_columns[missing_cids[i]]; + if (rs_column.has_default_value()) { + mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0); + } else if (rs_column.is_nullable()) { + assert_cast<vectorized::ColumnNullable*>(mutable_column.get()) + ->insert_null_elements(1); + } else { + mutable_column->insert_default(); + } + continue; + } full_mutable_columns[missing_cids[i]]->insert_from( *old_block.get_columns_with_type_and_name()[i].column.get(), read_index_old[idx]); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index aaf1623367b..dc5f488e044 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -185,8 +185,8 @@ public: std::vector<RowsetSharedPtr>* rowsets = nullptr); static Status generate_new_block_for_partial_update( - TabletSchemaSPtr rowset_schema, const std::vector<uint32>& missing_cids, - const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, + TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, + const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, vectorized::Block* output_block); diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index e08021b4f38..f20f9680b0b 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -28,6 +28,7 @@ struct PartialUpdateInfo { const std::string& auto_increment_column) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; + this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -50,8 +51,45 @@ struct PartialUpdateInfo { this->is_strict_mode = is_strict_mode; is_input_columns_contains_auto_inc_column = is_partial_update && partial_update_input_columns.contains(auto_increment_column); + _generate_default_values_for_missing_cids(tablet_schema); + } + +private: + void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema) { + for (auto i = 0; i < missing_cids.size(); ++i) { + auto cur_cid = missing_cids[i]; + const auto& column = tablet_schema.column(cur_cid); + if (column.has_default_value()) { + std::string default_value; + if (UNLIKELY(tablet_schema.column(cur_cid).type() == + FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && + to_lower(tablet_schema.column(cur_cid).default_value()) + .find(to_lower("CURRENT_TIMESTAMP")) != + std::string::npos)) { + DateV2Value<DateTimeV2ValueType> dtv; + dtv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dtv.debug_string(); + } else if (UNLIKELY(tablet_schema.column(cur_cid).type() == + FieldType::OLAP_FIELD_TYPE_DATEV2 && + to_lower(tablet_schema.column(cur_cid).default_value()) + .find(to_lower("CURRENT_DATE")) != + std::string::npos)) { + DateV2Value<DateV2ValueType> dv; + dv.from_unixtime(timestamp_ms / 1000, timezone); + default_value = dv.debug_string(); + } else { + default_value = tablet_schema.column(cur_cid).default_value(); + } + default_values.emplace_back(default_value); + } else { + // place an empty string here + default_values.emplace_back(); + } + } + CHECK_EQ(missing_cids.size(), default_values.size()); } +public: bool is_partial_update {false}; std::set<std::string> partial_update_input_columns; std::vector<uint32_t> missing_cids; @@ -64,5 +102,8 @@ struct PartialUpdateInfo { std::string timezone; bool is_input_columns_contains_auto_inc_column = false; bool is_schema_contains_auto_inc_column = false; + + // default values for missing cids + std::vector<std::string> default_values; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index b2fb585a86c..78fd69150c2 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -784,7 +784,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f const vectorized::Int8* delete_sign_column_data = nullptr; if (const vectorized::ColumnWithTypeAndName* delete_sign_column = old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) { + delete_sign_column != nullptr) { auto& delete_sign_col = reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column)); delete_sign_column_data = delete_sign_col.get_data().data(); @@ -794,29 +794,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f for (auto i = 0; i < cids_missing.size(); ++i) { const auto& column = _tablet_schema->column(cids_missing[i]); if (column.has_default_value()) { - std::string default_value; - if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() == - FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && - to_lower(_tablet_schema->column(cids_missing[i]).default_value()) - .find(to_lower("CURRENT_TIMESTAMP")) != - std::string::npos)) { - DateV2Value<DateTimeV2ValueType> dtv; - dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000, - _opts.rowset_ctx->partial_update_info->timezone); - default_value = dtv.debug_string(); - } else if (UNLIKELY( - _tablet_schema->column(cids_missing[i]).type() == - FieldType::OLAP_FIELD_TYPE_DATEV2 && - to_lower(_tablet_schema->column(cids_missing[i]).default_value()) - .find(to_lower("CURRENT_DATE")) != - std::string::npos)) { - DateV2Value<DateV2ValueType> dv; - dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000, - _opts.rowset_ctx->partial_update_info->timezone); - default_value = dv.debug_string(); - } else { - default_value = _tablet_schema->column(cids_missing[i]).default_value(); - } + const auto& default_value = + _opts.rowset_ctx->partial_update_info->default_values[i]; vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), default_value.size()); RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string( diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 124cfaf9b39..44c1997529e 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -608,7 +608,7 @@ Status VerticalSegmentWriter::_fill_missing_columns( const vectorized::Int8* delete_sign_column_data = nullptr; if (const vectorized::ColumnWithTypeAndName* delete_sign_column = old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) { + delete_sign_column != nullptr) { auto& delete_sign_col = reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column)); delete_sign_column_data = delete_sign_col.get_data().data(); @@ -618,29 +618,8 @@ Status VerticalSegmentWriter::_fill_missing_columns( for (auto i = 0; i < missing_cids.size(); ++i) { const auto& column = _tablet_schema->column(missing_cids[i]); if (column.has_default_value()) { - std::string default_value; - if (UNLIKELY(_tablet_schema->column(missing_cids[i]).type() == - FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && - to_lower(_tablet_schema->column(missing_cids[i]).default_value()) - .find(to_lower("CURRENT_TIMESTAMP")) != - std::string::npos)) { - DateV2Value<DateTimeV2ValueType> dtv; - dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000, - _opts.rowset_ctx->partial_update_info->timezone); - default_value = dtv.debug_string(); - } else if (UNLIKELY( - _tablet_schema->column(missing_cids[i]).type() == - FieldType::OLAP_FIELD_TYPE_DATEV2 && - to_lower(_tablet_schema->column(missing_cids[i]).default_value()) - .find(to_lower("CURRENT_DATE")) != - std::string::npos)) { - DateV2Value<DateV2ValueType> dv; - dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000, - _opts.rowset_ctx->partial_update_info->timezone); - default_value = dv.debug_string(); - } else { - default_value = _tablet_schema->column(missing_cids[i]).default_value(); - } + const auto& default_value = + _opts.rowset_ctx->partial_update_info->default_values[i]; vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), default_value.size()); RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string( diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv new file mode 100644 index 00000000000..62aa3a38c16 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv @@ -0,0 +1,5 @@ +1,10,1 +2,20,0 +3,30,1 +4,40,0 +5,50,1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out index 8d3e69bbe26..784dbd69536 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out @@ -10,7 +10,7 @@ 2 2 2 2 2 4 4 4 4 4 --- !with_delete_sign -- +-- !1 -- 1 \N \N \N \N 1 1 1 1 1 1 0 2 2 2 2 2 0 @@ -21,12 +21,51 @@ 5 5 5 5 5 0 6 \N \N \N \N 1 +-- !2 -- +1 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +4 4 4 4 4 0 +5 \N \N \N \N 1 +6 \N \N \N \N 1 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !after_delete -- +2 2 2 2 2 +4 4 4 4 4 + +-- !1 -- +1 1 1 1 1 0 +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + +-- !2 -- +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + -- !1 -- 1 1 1 -- !2 -- -- !3 -- +1 2 \N -- !1 -- 1 1 1 1 @@ -47,7 +86,7 @@ 2 2 2 2 2 4 4 4 4 4 --- !with_delete_sign -- +-- !1 -- 1 \N \N \N \N 1 1 1 1 1 1 0 2 2 2 2 2 0 @@ -58,12 +97,51 @@ 5 5 5 5 5 0 6 \N \N \N \N 1 +-- !2 -- +1 \N \N \N \N 1 +2 2 2 2 2 0 +3 \N \N \N \N 1 +4 4 4 4 4 0 +5 \N \N \N \N 1 +6 \N \N \N \N 1 + +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !after_delete -- +2 2 2 2 2 +4 4 4 4 4 + +-- !1 -- +1 1 1 1 1 0 +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + +-- !2 -- +1 1 1 1 1 1 +2 2 2 2 2 0 +3 3 3 3 3 1 +4 4 4 4 4 0 +5 5 5 5 5 1 +6 \N \N \N \N 1 + -- !1 -- 1 1 1 -- !2 -- -- !3 -- +1 2 \N -- !1 -- 1 1 1 1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out new file mode 100644 index 00000000000..faa4ca1d0bb --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 +4 4 4 4 4 +5 5 5 5 5 + +-- !sql -- +2 20 \N \N foo +4 40 \N \N foo + +-- !sql -- +1 100 10 \N foo +2 20 20 \N foo +3 100 30 \N foo +4 40 40 \N foo +5 100 50 \N foo + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy index 1539293d9fd..f2d93d9d715 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy @@ -61,68 +61,67 @@ suite('test_partial_update_delete_sign') { sql "set skip_storage_engine_merge=true;" sql "set skip_delete_bitmap=true;" sql "sync" - // // skip_delete_bitmap=true, skip_delete_sign=true - // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // // skip_delete_bitmap=false, skip_delete_sign=true - // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + // skip_delete_bitmap=true, skip_delete_sign=true + qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=true;" + sql "set skip_delete_bitmap=false;" + sql "sync" + // skip_delete_bitmap=false, skip_delete_sign=true + qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" sql "drop table if exists ${tableName1};" - // sql "set skip_delete_sign=false;" - // sql "set skip_storage_engine_merge=false;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // def tableName2 = "test_partial_update_delete_sign2" - // sql "DROP TABLE IF EXISTS ${tableName2};" - // sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( - // `k1` int NOT NULL, - // `c1` int, - // `c2` int, - // `c3` int, - // `c4` int - // )UNIQUE KEY(k1) - // DISTRIBUTED BY HASH(k1) BUCKETS 1 - // PROPERTIES ( - // "enable_unique_key_merge_on_write" = "true", - // "disable_auto_compaction" = "true", - // "replication_num" = "1", - // "function_column.sequence_col" = 'c4' - // );""" - - // sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" - // qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;" - // streamLoad { - // table "${tableName2}" - - // set 'column_separator', ',' - // set 'format', 'csv' - // set 'partial_columns', 'true' - // set 'columns', 'k1,__DORIS_DELETE_SIGN__' - - // file 'delete_sign.csv' - // time 10000 // limit inflight 10s - // } - // sql "sync" - // qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_storage_engine_merge=true;" - // sql "set skip_delete_bitmap=true;" - // sql "sync" - // // skip_delete_bitmap=true, skip_delete_sign=true - // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - - // sql "set skip_delete_sign=true;" - // sql "set skip_delete_bitmap=false;" - // sql "sync" - // // skip_delete_bitmap=false, skip_delete_sign=true - // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" - // sql "drop table if exists ${tableName2};" + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + sql "sync" + def tableName2 = "test_partial_update_delete_sign2" + sql "DROP TABLE IF EXISTS ${tableName2};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "function_column.sequence_col" = 'c4' + );""" + + sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;" + streamLoad { + table "${tableName2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' /* NOTE: it's a partial update */ + set 'columns', 'k1,__DORIS_DELETE_SIGN__' + + file 'delete_sign.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + sql "sync" + // skip_delete_bitmap=true, skip_delete_sign=true + qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + + sql "set skip_delete_sign=true;" + sql "set skip_delete_bitmap=false;" + sql "sync" + // skip_delete_bitmap=false, skip_delete_sign=true + qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" + sql "drop table if exists ${tableName2};" // partial update a row that has been deleted by delete sign(table without sequence column) diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy new file mode 100644 index 00000000000..7e2cd9cdfe3 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy @@ -0,0 +1,151 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils + +suite("test_partial_update_delete_sign_with_conflict") { + def dbName = context.config.getDbNameByFile(context.file) + def tableName = "test_partial_update_delete_sign_with_conflict" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NOT NULL, + `c1` int default 100, + `c2` int, + `c3` int, + `c4` varchar(100) default 'foo' + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${tableName} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);" + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" + + // NOTE: use streamload 2pc to construct the conflict of publish + def do_streamload_2pc_commit = { txnId -> + def command = "curl -X PUT --location-trusted -u ${context.config.feHttpUser}:${context.config.feHttpPassword}" + + " -H txn_id:${txnId}" + + " -H txn_operation:commit" + + " http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc" + log.info("http_stream execute 2pc: ${command}") + + def process = command.execute() + code = process.waitFor() + out = process.text + json2pc = parseJson(out) + log.info("http_stream 2pc result: ${out}".toString()) + assertEquals(code, 0) + assertEquals("success", json2pc.status.toLowerCase()) + } + + def wait_for_publish = {txnId, waitSecond -> + String st = "PREPARE" + while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && waitSecond > 0) { + Thread.sleep(1000) + waitSecond -= 1 + def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}" + assertNotNull(result) + st = result[0].TransactionStatus + } + log.info("Stream load with txn ${txnId} is ${st}") + assertEquals(st, "VISIBLE") + } + + // concurrent load 1 + String txnId1 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'k1,c1,label_c2' + set 'merge_type', 'MERGE' + set 'delete', 'label_c2=1' + set 'strict_mode', 'false' + set 'two_phase_commit', 'true' + file 'partial_update_parallel_with_delete_sign.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId1 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + String txnId2 + // concurrent load 2 + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,c2' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'partial_update_parallel3.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId2 = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + sql "sync;" + + // complete load 1 first + do_streamload_2pc_commit(txnId1) + wait_for_publish(txnId1, 10) + + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" + + // publish will retry until success + // FE retry may take logger time, wait for 20 secs + do_streamload_2pc_commit(txnId2) + wait_for_publish(txnId2, 20) + + sql "sync;" + qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org