This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 57e02cd5e31f871bc762aee6dec14dc6cb69d851
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Fri Jun 14 09:27:09 2024 +0800

    [fix](partial update) partial update should not read old fileds from rows 
with delete sign (#36210)
    
    Issue Number: close #34296
    
    1. When partial update filling in the missing fields, if a load job
    previously wrote data with a delete sign, it will also read out the data
    in the column with the delete sign, so that the newly written data will
    also become invisible
    2. This problem was fixed in #24877, but was introduced again in #26721,
    and was never found because the case was changed to the wrong output in
    #26721.
    3. The fix in #24877 didn't take into account the handling of concurrent
    conflicts in the publish phase, the current PR adds this part of the
    handling, and adds the corresponding case.
---
 be/src/olap/base_tablet.cpp                        |  58 +++++++-
 be/src/olap/base_tablet.h                          |   4 +-
 be/src/olap/partial_update_info.h                  |  41 ++++++
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  27 +---
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  27 +---
 .../partial_update_parallel_with_delete_sign.csv   |   5 +
 .../test_partial_update_delete_sign.out            |  82 ++++++++++-
 ...st_partial_update_delete_sign_with_conflict.out |  19 +++
 .../test_partial_update_delete_sign.groovy         | 117 ++++++++--------
 ...partial_update_delete_sign_with_conflict.groovy | 151 +++++++++++++++++++++
 10 files changed, 415 insertions(+), 116 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 89365e0f4e4..2fa90051165 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -798,8 +798,8 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
         auto partial_update_info = rowset_writer->get_partial_update_info();
         DCHECK(partial_update_info);
         RETURN_IF_ERROR(generate_new_block_for_partial_update(
-                rowset_schema, partial_update_info->missing_cids, 
partial_update_info->update_cids,
-                read_plan_ori, read_plan_update, rsid_to_rowset, &block));
+                rowset_schema, partial_update_info.get(), read_plan_ori, 
read_plan_update,
+                rsid_to_rowset, &block));
         RETURN_IF_ERROR(sort_block(block, ordered_block));
         RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block));
         if (new_generated_rows != rowset_writer->num_rows()) {
@@ -928,9 +928,8 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr 
input_rowset, uint32_t
 }
 
 Status BaseTablet::generate_new_block_for_partial_update(
-        TabletSchemaSPtr rowset_schema, const std::vector<uint32>& 
missing_cids,
-        const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& 
read_plan_ori,
-        const PartialUpdateReadPlan& read_plan_update,
+        TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* 
partial_update_info,
+        const PartialUpdateReadPlan& read_plan_ori, const 
PartialUpdateReadPlan& read_plan_update,
         const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
         vectorized::Block* output_block) {
     // do partial update related works
@@ -940,6 +939,8 @@ Status BaseTablet::generate_new_block_for_partial_update(
     // 4. mark current keys deleted
     CHECK(output_block);
     auto full_mutable_columns = output_block->mutate_columns();
+    const auto& missing_cids = partial_update_info->missing_cids;
+    const auto& update_cids = partial_update_info->update_cids;
     auto old_block = rowset_schema->create_block_by_cids(missing_cids);
     auto update_block = rowset_schema->create_block_by_cids(update_cids);
 
@@ -951,10 +952,57 @@ Status BaseTablet::generate_new_block_for_partial_update(
     RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, 
read_plan_update,
                                          rsid_to_rowset, update_block, 
&read_index_update));
 
+    const vectorized::Int8* delete_sign_column_data = nullptr;
+    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
+                old_block.try_get_by_name(DELETE_SIGN);
+        delete_sign_column != nullptr) {
+        auto& delete_sign_col =
+                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+        delete_sign_column_data = delete_sign_col.get_data().data();
+    }
+
+    // build default value block
+    auto default_value_block = old_block.clone_empty();
+    auto mutable_default_value_columns = default_value_block.mutate_columns();
+    if (delete_sign_column_data != nullptr) {
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            const auto& column = rowset_schema->column(missing_cids[i]);
+            if (column.has_default_value()) {
+                const auto& default_value = 
partial_update_info->default_values[i];
+                vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
+                                          default_value.size());
+                RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string(
+                        rb, mutable_default_value_columns[i].get()));
+            }
+        }
+    }
+
     // build full block
     CHECK(read_index_old.size() == read_index_update.size());
+
     for (auto i = 0; i < missing_cids.size(); ++i) {
+        const auto& rs_column = rowset_schema->column(missing_cids[i]);
         for (auto idx = 0; idx < read_index_old.size(); ++idx) {
+            // if the conflict update is a delete sign, which means that the 
key is
+            // not exist now, we should not read old values from the deleted 
data,
+            // and should use default value instead.
+            // NOTE: since now we are in the publishing phase, all data is 
commited
+            // before, even the `strict_mode` is true (which requires partial 
update
+            // load job can't insert new keys), this "new" key MUST be written 
into
+            // the new generated segment file.
+            if (delete_sign_column_data != nullptr &&
+                delete_sign_column_data[read_index_old[idx]] != 0) {
+                auto& mutable_column = full_mutable_columns[missing_cids[i]];
+                if (rs_column.has_default_value()) {
+                    
mutable_column->insert_from(*mutable_default_value_columns[i].get(), 0);
+                } else if (rs_column.is_nullable()) {
+                    
assert_cast<vectorized::ColumnNullable*>(mutable_column.get())
+                            ->insert_null_elements(1);
+                } else {
+                    mutable_column->insert_default();
+                }
+                continue;
+            }
             full_mutable_columns[missing_cids[i]]->insert_from(
                     
*old_block.get_columns_with_type_and_name()[i].column.get(),
                     read_index_old[idx]);
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index aaf1623367b..dc5f488e044 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -185,8 +185,8 @@ public:
                                            std::vector<RowsetSharedPtr>* 
rowsets = nullptr);
 
     static Status generate_new_block_for_partial_update(
-            TabletSchemaSPtr rowset_schema, const std::vector<uint32>& 
missing_cids,
-            const std::vector<uint32>& update_cids, const 
PartialUpdateReadPlan& read_plan_ori,
+            TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* 
partial_update_info,
+            const PartialUpdateReadPlan& read_plan_ori,
             const PartialUpdateReadPlan& read_plan_update,
             const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
             vectorized::Block* output_block);
diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index e08021b4f38..f20f9680b0b 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -28,6 +28,7 @@ struct PartialUpdateInfo {
               const std::string& auto_increment_column) {
         is_partial_update = partial_update;
         partial_update_input_columns = partial_update_cols;
+
         this->timestamp_ms = timestamp_ms;
         this->timezone = timezone;
         missing_cids.clear();
@@ -50,8 +51,45 @@ struct PartialUpdateInfo {
         this->is_strict_mode = is_strict_mode;
         is_input_columns_contains_auto_inc_column =
                 is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
+        _generate_default_values_for_missing_cids(tablet_schema);
+    }
+
+private:
+    void _generate_default_values_for_missing_cids(const TabletSchema& 
tablet_schema) {
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            auto cur_cid = missing_cids[i];
+            const auto& column = tablet_schema.column(cur_cid);
+            if (column.has_default_value()) {
+                std::string default_value;
+                if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
+                             
to_lower(tablet_schema.column(cur_cid).default_value())
+                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
+                                     std::string::npos)) {
+                    DateV2Value<DateTimeV2ValueType> dtv;
+                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
+                    default_value = dtv.debug_string();
+                } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                            FieldType::OLAP_FIELD_TYPE_DATEV2 
&&
+                                    
to_lower(tablet_schema.column(cur_cid).default_value())
+                                                    
.find(to_lower("CURRENT_DATE")) !=
+                                            std::string::npos)) {
+                    DateV2Value<DateV2ValueType> dv;
+                    dv.from_unixtime(timestamp_ms / 1000, timezone);
+                    default_value = dv.debug_string();
+                } else {
+                    default_value = 
tablet_schema.column(cur_cid).default_value();
+                }
+                default_values.emplace_back(default_value);
+            } else {
+                // place an empty string here
+                default_values.emplace_back();
+            }
+        }
+        CHECK_EQ(missing_cids.size(), default_values.size());
     }
 
+public:
     bool is_partial_update {false};
     std::set<std::string> partial_update_input_columns;
     std::vector<uint32_t> missing_cids;
@@ -64,5 +102,8 @@ struct PartialUpdateInfo {
     std::string timezone;
     bool is_input_columns_contains_auto_inc_column = false;
     bool is_schema_contains_auto_inc_column = false;
+
+    // default values for missing cids
+    std::vector<std::string> default_values;
 };
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index b2fb585a86c..78fd69150c2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -784,7 +784,7 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
     const vectorized::Int8* delete_sign_column_data = nullptr;
     if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
                 old_value_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+        delete_sign_column != nullptr) {
         auto& delete_sign_col =
                 reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
         delete_sign_column_data = delete_sign_col.get_data().data();
@@ -794,29 +794,8 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
         for (auto i = 0; i < cids_missing.size(); ++i) {
             const auto& column = _tablet_schema->column(cids_missing[i]);
             if (column.has_default_value()) {
-                std::string default_value;
-                if (UNLIKELY(_tablet_schema->column(cids_missing[i]).type() ==
-                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-                             
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
-                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
-                                     std::string::npos)) {
-                    DateV2Value<DateTimeV2ValueType> dtv;
-                    
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                      
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dtv.debug_string();
-                } else if (UNLIKELY(
-                                   
_tablet_schema->column(cids_missing[i]).type() ==
-                                           FieldType::OLAP_FIELD_TYPE_DATEV2 &&
-                                   
to_lower(_tablet_schema->column(cids_missing[i]).default_value())
-                                                   
.find(to_lower("CURRENT_DATE")) !=
-                                           std::string::npos)) {
-                    DateV2Value<DateV2ValueType> dv;
-                    
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                     
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dv.debug_string();
-                } else {
-                    default_value = 
_tablet_schema->column(cids_missing[i]).default_value();
-                }
+                const auto& default_value =
+                        
_opts.rowset_ctx->partial_update_info->default_values[i];
                 vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
                                           default_value.size());
                 
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 124cfaf9b39..44c1997529e 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -608,7 +608,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
     const vectorized::Int8* delete_sign_column_data = nullptr;
     if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
                 old_value_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr && _tablet_schema->has_sequence_col()) {
+        delete_sign_column != nullptr) {
         auto& delete_sign_col =
                 reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
         delete_sign_column_data = delete_sign_col.get_data().data();
@@ -618,29 +618,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
         for (auto i = 0; i < missing_cids.size(); ++i) {
             const auto& column = _tablet_schema->column(missing_cids[i]);
             if (column.has_default_value()) {
-                std::string default_value;
-                if (UNLIKELY(_tablet_schema->column(missing_cids[i]).type() ==
-                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-                             
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
-                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
-                                     std::string::npos)) {
-                    DateV2Value<DateTimeV2ValueType> dtv;
-                    
dtv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                      
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dtv.debug_string();
-                } else if (UNLIKELY(
-                                   
_tablet_schema->column(missing_cids[i]).type() ==
-                                           FieldType::OLAP_FIELD_TYPE_DATEV2 &&
-                                   
to_lower(_tablet_schema->column(missing_cids[i]).default_value())
-                                                   
.find(to_lower("CURRENT_DATE")) !=
-                                           std::string::npos)) {
-                    DateV2Value<DateV2ValueType> dv;
-                    
dv.from_unixtime(_opts.rowset_ctx->partial_update_info->timestamp_ms / 1000,
-                                     
_opts.rowset_ctx->partial_update_info->timezone);
-                    default_value = dv.debug_string();
-                } else {
-                    default_value = 
_tablet_schema->column(missing_cids[i]).default_value();
-                }
+                const auto& default_value =
+                        
_opts.rowset_ctx->partial_update_info->default_values[i];
                 vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
                                           default_value.size());
                 
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
new file mode 100644
index 00000000000..62aa3a38c16
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_parallel_with_delete_sign.csv
@@ -0,0 +1,5 @@
+1,10,1
+2,20,0
+3,30,1
+4,40,0
+5,50,1
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
index 8d3e69bbe26..784dbd69536 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.out
@@ -10,7 +10,7 @@
 2      2       2       2       2
 4      4       4       4       4
 
--- !with_delete_sign --
+-- !1 --
 1      \N      \N      \N      \N      1
 1      1       1       1       1       0
 2      2       2       2       2       0
@@ -21,12 +21,51 @@
 5      5       5       5       5       0
 6      \N      \N      \N      \N      1
 
+-- !2 --
+1      \N      \N      \N      \N      1
+2      2       2       2       2       0
+3      \N      \N      \N      \N      1
+4      4       4       4       4       0
+5      \N      \N      \N      \N      1
+6      \N      \N      \N      \N      1
+
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+4      4       4       4       4
+5      5       5       5       5
+
+-- !after_delete --
+2      2       2       2       2
+4      4       4       4       4
+
+-- !1 --
+1      1       1       1       1       0
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
+-- !2 --
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
 -- !1 --
 1      1       1
 
 -- !2 --
 
 -- !3 --
+1      2       \N
 
 -- !1 --
 1      1       1       1
@@ -47,7 +86,7 @@
 2      2       2       2       2
 4      4       4       4       4
 
--- !with_delete_sign --
+-- !1 --
 1      \N      \N      \N      \N      1
 1      1       1       1       1       0
 2      2       2       2       2       0
@@ -58,12 +97,51 @@
 5      5       5       5       5       0
 6      \N      \N      \N      \N      1
 
+-- !2 --
+1      \N      \N      \N      \N      1
+2      2       2       2       2       0
+3      \N      \N      \N      \N      1
+4      4       4       4       4       0
+5      \N      \N      \N      \N      1
+6      \N      \N      \N      \N      1
+
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+4      4       4       4       4
+5      5       5       5       5
+
+-- !after_delete --
+2      2       2       2       2
+4      4       4       4       4
+
+-- !1 --
+1      1       1       1       1       0
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
+-- !2 --
+1      1       1       1       1       1
+2      2       2       2       2       0
+3      3       3       3       3       1
+4      4       4       4       4       0
+5      5       5       5       5       1
+6      \N      \N      \N      \N      1
+
 -- !1 --
 1      1       1
 
 -- !2 --
 
 -- !3 --
+1      2       \N
 
 -- !1 --
 1      1       1       1
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
new file mode 100644
index 00000000000..faa4ca1d0bb
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+4      4       4       4       4
+5      5       5       5       5
+
+-- !sql --
+2      20      \N      \N      foo
+4      40      \N      \N      foo
+
+-- !sql --
+1      100     10      \N      foo
+2      20      20      \N      foo
+3      100     30      \N      foo
+4      40      40      \N      foo
+5      100     50      \N      foo
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
index 1539293d9fd..f2d93d9d715 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign.groovy
@@ -61,68 +61,67 @@ suite('test_partial_update_delete_sign') {
             sql "set skip_storage_engine_merge=true;"
             sql "set skip_delete_bitmap=true;"
             sql "sync"
-            // // skip_delete_bitmap=true, skip_delete_sign=true
-            // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-
-            // sql "set skip_delete_sign=true;"
-            // sql "set skip_delete_bitmap=false;"
-            // sql "sync"
-            // // skip_delete_bitmap=false, skip_delete_sign=true
-            // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-            qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ 
from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+            // skip_delete_bitmap=true, skip_delete_sign=true
+            qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+
+            sql "set skip_delete_sign=true;"
+            sql "set skip_delete_bitmap=false;"
+            sql "sync"
+            // skip_delete_bitmap=false, skip_delete_sign=true
+            qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
             sql "drop table if exists ${tableName1};"
 
 
-            // sql "set skip_delete_sign=false;"
-            // sql "set skip_storage_engine_merge=false;"
-            // sql "set skip_delete_bitmap=false;"
-            // sql "sync"
-            // def tableName2 = "test_partial_update_delete_sign2"
-            // sql "DROP TABLE IF EXISTS ${tableName2};"
-            // sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
-            //         `k1` int NOT NULL,
-            //         `c1` int,
-            //         `c2` int,
-            //         `c3` int,
-            //         `c4` int
-            //         )UNIQUE KEY(k1)
-            //     DISTRIBUTED BY HASH(k1) BUCKETS 1
-            //     PROPERTIES (
-            //         "enable_unique_key_merge_on_write" = "true",
-            //         "disable_auto_compaction" = "true",
-            //         "replication_num" = "1",
-            //         "function_column.sequence_col" = 'c4'
-            //     );"""
-
-            // sql "insert into ${tableName2} 
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
-            // qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
-            // streamLoad {
-            //     table "${tableName2}"
-
-            //     set 'column_separator', ','
-            //     set 'format', 'csv'
-            //     set 'partial_columns', 'true'
-            //     set 'columns', 'k1,__DORIS_DELETE_SIGN__'
-
-            //     file 'delete_sign.csv'
-            //     time 10000 // limit inflight 10s
-            // }
-            // sql "sync"
-            // qt_after_delete "select * from ${tableName2} order by 
k1,c1,c2,c3,c4;"
-
-            // sql "set skip_delete_sign=true;"
-            // sql "set skip_storage_engine_merge=true;"
-            // sql "set skip_delete_bitmap=true;"
-            // sql "sync"
-            // // skip_delete_bitmap=true, skip_delete_sign=true
-            // qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-
-            // sql "set skip_delete_sign=true;"
-            // sql "set skip_delete_bitmap=false;"
-            // sql "sync"
-            // // skip_delete_bitmap=false, skip_delete_sign=true
-            // qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
-            // sql "drop table if exists ${tableName2};"
+            sql "set skip_delete_sign=false;"
+            sql "set skip_storage_engine_merge=false;"
+            sql "set skip_delete_bitmap=false;"
+            sql "sync"
+            def tableName2 = "test_partial_update_delete_sign2"
+            sql "DROP TABLE IF EXISTS ${tableName2};"
+            sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
+                    `k1` int NOT NULL,
+                    `c1` int,
+                    `c2` int,
+                    `c3` int,
+                    `c4` int
+                    )UNIQUE KEY(k1)
+                DISTRIBUTED BY HASH(k1) BUCKETS 1
+                PROPERTIES (
+                    "enable_unique_key_merge_on_write" = "true",
+                    "disable_auto_compaction" = "true",
+                    "replication_num" = "1",
+                    "function_column.sequence_col" = 'c4'
+                );"""
+
+            sql "insert into ${tableName2} 
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
+            qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
+            streamLoad {
+                table "${tableName2}"
+
+                set 'column_separator', ','
+                set 'format', 'csv'
+                set 'partial_columns', 'true' /* NOTE: it's a partial update */
+                set 'columns', 'k1,__DORIS_DELETE_SIGN__'
+
+                file 'delete_sign.csv'
+                time 10000 // limit inflight 10s
+            }
+            sql "sync"
+            qt_after_delete "select * from ${tableName2} order by 
k1,c1,c2,c3,c4;"
+
+            sql "set skip_delete_sign=true;"
+            sql "set skip_storage_engine_merge=true;"
+            sql "set skip_delete_bitmap=true;"
+            sql "sync"
+            // skip_delete_bitmap=true, skip_delete_sign=true
+            qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+
+            sql "set skip_delete_sign=true;"
+            sql "set skip_delete_bitmap=false;"
+            sql "sync"
+            // skip_delete_bitmap=false, skip_delete_sign=true
+            qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from 
${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
+            sql "drop table if exists ${tableName2};"
 
 
             // partial update a row that has been deleted by delete sign(table 
without sequence column)
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
new file mode 100644
index 00000000000..7e2cd9cdfe3
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete_sign_with_conflict.groovy
@@ -0,0 +1,151 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_delete_sign_with_conflict") {
+    def dbName = context.config.getDbNameByFile(context.file)
+    def tableName = "test_partial_update_delete_sign_with_conflict"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int NOT NULL,
+            `c1` int default 100,
+            `c2` int,
+            `c3` int,
+            `c4` varchar(100) default 'foo'
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${tableName} 
values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
+    sql "sync;"
+    qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+
+    // NOTE: use streamload 2pc to construct the conflict of publish
+    def do_streamload_2pc_commit = { txnId ->
+        def command = "curl -X PUT --location-trusted -u 
${context.config.feHttpUser}:${context.config.feHttpPassword}" +
+                " -H txn_id:${txnId}" +
+                " -H txn_operation:commit" +
+                " 
http://${context.config.feHttpAddress}/api/${dbName}/${tableName}/_stream_load_2pc";
+        log.info("http_stream execute 2pc: ${command}")
+
+        def process = command.execute()
+        code = process.waitFor()
+        out = process.text
+        json2pc = parseJson(out)
+        log.info("http_stream 2pc result: ${out}".toString())
+        assertEquals(code, 0)
+        assertEquals("success", json2pc.status.toLowerCase())
+    }
+
+    def wait_for_publish = {txnId, waitSecond ->
+        String st = "PREPARE"
+        while (!st.equalsIgnoreCase("VISIBLE") && 
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+            Thread.sleep(1000)
+            waitSecond -= 1
+            def result = sql_return_maparray "show transaction from ${dbName} 
where id = ${txnId}"
+            assertNotNull(result)
+            st = result[0].TransactionStatus
+        }
+        log.info("Stream load with txn ${txnId} is ${st}")
+        assertEquals(st, "VISIBLE")
+    }
+
+    // concurrent load 1
+    String txnId1
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'k1,c1,label_c2'
+        set 'merge_type', 'MERGE'
+        set 'delete', 'label_c2=1'
+        set 'strict_mode', 'false'
+        set 'two_phase_commit', 'true'
+        file 'partial_update_parallel_with_delete_sign.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId1 = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+
+    String txnId2
+    // concurrent load 2
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'k1,c2'
+        set 'strict_mode', "false"
+        set 'two_phase_commit', 'true'
+        file 'partial_update_parallel3.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId2 = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+    sql "sync;"
+
+    // complete load 1 first
+    do_streamload_2pc_commit(txnId1)
+    wait_for_publish(txnId1, 10)
+
+    sql "sync;"
+    qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+
+    // publish will retry until success
+    // FE retry may take logger time, wait for 20 secs
+    do_streamload_2pc_commit(txnId2)
+    wait_for_publish(txnId2, 20)
+
+    sql "sync;"
+    qt_sql "select * from ${tableName} order by k1,c1,c2,c3,c4;"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to