This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1fbfb81b8a1 [branch-2.1] Picks "[Fix](partial update) Persist 
partial_update_info in RocksDB in case of BE restart after a partial update has 
commited #38331" (#39035)
1fbfb81b8a1 is described below

commit 1fbfb81b8a1698e6383f8b50f8b01b91d74bf287
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Thu Aug 8 14:50:08 2024 +0800

    [branch-2.1] Picks "[Fix](partial update) Persist partial_update_info in 
RocksDB in case of BE restart after a partial update has commited #38331" 
(#39035)
    
    picks https://github.com/apache/doris/pull/38331 and
    https://github.com/apache/doris/pull/39066
---
 be/src/olap/partial_update_info.cpp                | 155 ++++++++++++++++++++
 be/src/olap/partial_update_info.h                  |  76 ++--------
 be/src/olap/rowset/rowset_meta_manager.cpp         |  94 +++++++++++++
 be/src/olap/rowset/rowset_meta_manager.h           |  21 +++
 be/src/olap/rowset_builder.cpp                     |   8 +-
 be/src/olap/storage_engine.cpp                     |  32 +++++
 be/src/olap/storage_engine.h                       |   2 +
 be/src/olap/txn_manager.cpp                        |  71 +++++++++-
 be/src/olap/txn_manager.h                          |  10 +-
 gensrc/proto/olap_file.proto                       |  15 ++
 .../unique_with_mow_p0/partial_update/data1.csv    |   2 +
 .../test_partial_update_conflict_be_restart.out    |  21 +++
 .../test_partial_update_conflict_be_restart.groovy | 156 +++++++++++++++++++++
 13 files changed, 586 insertions(+), 77 deletions(-)

diff --git a/be/src/olap/partial_update_info.cpp 
b/be/src/olap/partial_update_info.cpp
new file mode 100644
index 00000000000..5867a77559b
--- /dev/null
+++ b/be/src/olap/partial_update_info.cpp
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/partial_update_info.h"
+
+#include <gen_cpp/olap_file.pb.h>
+
+#include "olap/tablet_schema.h"
+
+namespace doris {
+
+void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool 
partial_update,
+                             const std::set<string>& partial_update_cols, bool 
is_strict_mode,
+                             int64_t timestamp_ms, const std::string& timezone,
+                             const std::string& auto_increment_column, int64_t 
cur_max_version) {
+    is_partial_update = partial_update;
+    partial_update_input_columns = partial_update_cols;
+    max_version_in_flush_phase = cur_max_version;
+    this->timestamp_ms = timestamp_ms;
+    this->timezone = timezone;
+    missing_cids.clear();
+    update_cids.clear();
+    for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
+        auto tablet_column = tablet_schema.column(i);
+        if (!partial_update_input_columns.contains(tablet_column.name())) {
+            missing_cids.emplace_back(i);
+            if (!tablet_column.has_default_value() && 
!tablet_column.is_nullable() &&
+                tablet_schema.auto_increment_column() != tablet_column.name()) 
{
+                can_insert_new_rows_in_partial_update = false;
+            }
+        } else {
+            update_cids.emplace_back(i);
+        }
+        if (auto_increment_column == tablet_column.name()) {
+            is_schema_contains_auto_inc_column = true;
+        }
+    }
+    this->is_strict_mode = is_strict_mode;
+    is_input_columns_contains_auto_inc_column =
+            is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
+    _generate_default_values_for_missing_cids(tablet_schema);
+}
+
+void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) 
const {
+    partial_update_info_pb->set_is_partial_update(is_partial_update);
+    
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
+    for (const auto& col : partial_update_input_columns) {
+        partial_update_info_pb->add_partial_update_input_columns(col);
+    }
+    for (auto cid : missing_cids) {
+        partial_update_info_pb->add_missing_cids(cid);
+    }
+    for (auto cid : update_cids) {
+        partial_update_info_pb->add_update_cids(cid);
+    }
+    partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
+            can_insert_new_rows_in_partial_update);
+    partial_update_info_pb->set_is_strict_mode(is_strict_mode);
+    partial_update_info_pb->set_timestamp_ms(timestamp_ms);
+    partial_update_info_pb->set_timezone(timezone);
+    partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
+            is_input_columns_contains_auto_inc_column);
+    partial_update_info_pb->set_is_schema_contains_auto_inc_column(
+            is_schema_contains_auto_inc_column);
+    for (const auto& value : default_values) {
+        partial_update_info_pb->add_default_values(value);
+    }
+}
+
+void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
+    is_partial_update = partial_update_info_pb->is_partial_update();
+    max_version_in_flush_phase = 
partial_update_info_pb->has_max_version_in_flush_phase()
+                                         ? 
partial_update_info_pb->max_version_in_flush_phase()
+                                         : -1;
+    partial_update_input_columns.clear();
+    for (const auto& col : 
partial_update_info_pb->partial_update_input_columns()) {
+        partial_update_input_columns.insert(col);
+    }
+    missing_cids.clear();
+    for (auto cid : partial_update_info_pb->missing_cids()) {
+        missing_cids.push_back(cid);
+    }
+    update_cids.clear();
+    for (auto cid : partial_update_info_pb->update_cids()) {
+        update_cids.push_back(cid);
+    }
+    can_insert_new_rows_in_partial_update =
+            partial_update_info_pb->can_insert_new_rows_in_partial_update();
+    is_strict_mode = partial_update_info_pb->is_strict_mode();
+    timestamp_ms = partial_update_info_pb->timestamp_ms();
+    timezone = partial_update_info_pb->timezone();
+    is_input_columns_contains_auto_inc_column =
+            
partial_update_info_pb->is_input_columns_contains_auto_inc_column();
+    is_schema_contains_auto_inc_column =
+            partial_update_info_pb->is_schema_contains_auto_inc_column();
+    default_values.clear();
+    for (const auto& value : partial_update_info_pb->default_values()) {
+        default_values.push_back(value);
+    }
+}
+
+std::string PartialUpdateInfo::summary() const {
+    return fmt::format(
+            "update_cids={}, missing_cids={}, is_strict_mode={}, 
max_version_in_flush_phase={}",
+            update_cids.size(), missing_cids.size(), is_strict_mode, 
max_version_in_flush_phase);
+}
+
+void PartialUpdateInfo::_generate_default_values_for_missing_cids(
+        const TabletSchema& tablet_schema) {
+    for (unsigned int cur_cid : missing_cids) {
+        const auto& column = tablet_schema.column(cur_cid);
+        if (column.has_default_value()) {
+            std::string default_value;
+            if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                 FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
+                         
to_lower(tablet_schema.column(cur_cid).default_value())
+                                         .find(to_lower("CURRENT_TIMESTAMP")) 
!=
+                                 std::string::npos)) {
+                DateV2Value<DateTimeV2ValueType> dtv;
+                dtv.from_unixtime(timestamp_ms / 1000, timezone);
+                default_value = dtv.debug_string();
+            } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                        FieldType::OLAP_FIELD_TYPE_DATEV2 &&
+                                
to_lower(tablet_schema.column(cur_cid).default_value())
+                                                
.find(to_lower("CURRENT_DATE")) !=
+                                        std::string::npos)) {
+                DateV2Value<DateV2ValueType> dv;
+                dv.from_unixtime(timestamp_ms / 1000, timezone);
+                default_value = dv.debug_string();
+            } else {
+                default_value = tablet_schema.column(cur_cid).default_value();
+            }
+            default_values.emplace_back(default_value);
+        } else {
+            // place an empty string here
+            default_values.emplace_back();
+        }
+    }
+    CHECK_EQ(missing_cids.size(), default_values.size());
+}
+} // namespace doris
diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index 4b62cb8f0ff..987f31ec7f7 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -16,78 +16,26 @@
 // under the License.
 
 #pragma once
-
-#include "olap/tablet_schema.h"
+#include <cstdint>
+#include <set>
+#include <string>
+#include <vector>
 
 namespace doris {
+class TabletSchema;
+class PartialUpdateInfoPB;
 
 struct PartialUpdateInfo {
     void init(const TabletSchema& tablet_schema, bool partial_update,
-              const std::set<string>& partial_update_cols, bool is_strict_mode,
+              const std::set<std::string>& partial_update_cols, bool 
is_strict_mode,
               int64_t timestamp_ms, const std::string& timezone,
-              const std::string& auto_increment_column, int64_t 
cur_max_version = -1) {
-        is_partial_update = partial_update;
-        partial_update_input_columns = partial_update_cols;
-        max_version_in_flush_phase = cur_max_version;
-        this->timestamp_ms = timestamp_ms;
-        this->timezone = timezone;
-        missing_cids.clear();
-        update_cids.clear();
-        for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
-            auto tablet_column = tablet_schema.column(i);
-            if (!partial_update_input_columns.contains(tablet_column.name())) {
-                missing_cids.emplace_back(i);
-                if (!tablet_column.has_default_value() && 
!tablet_column.is_nullable() &&
-                    tablet_schema.auto_increment_column() != 
tablet_column.name()) {
-                    can_insert_new_rows_in_partial_update = false;
-                }
-            } else {
-                update_cids.emplace_back(i);
-            }
-            if (auto_increment_column == tablet_column.name()) {
-                is_schema_contains_auto_inc_column = true;
-            }
-        }
-        this->is_strict_mode = is_strict_mode;
-        is_input_columns_contains_auto_inc_column =
-                is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
-        _generate_default_values_for_missing_cids(tablet_schema);
-    }
+              const std::string& auto_increment_column, int64_t 
cur_max_version = -1);
+    void to_pb(PartialUpdateInfoPB* partial_update_info) const;
+    void from_pb(PartialUpdateInfoPB* partial_update_info);
+    std::string summary() const;
 
 private:
-    void _generate_default_values_for_missing_cids(const TabletSchema& 
tablet_schema) {
-        for (auto i = 0; i < missing_cids.size(); ++i) {
-            auto cur_cid = missing_cids[i];
-            const auto& column = tablet_schema.column(cur_cid);
-            if (column.has_default_value()) {
-                std::string default_value;
-                if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
-                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-                             
to_lower(tablet_schema.column(cur_cid).default_value())
-                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
-                                     std::string::npos)) {
-                    DateV2Value<DateTimeV2ValueType> dtv;
-                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
-                    default_value = dtv.debug_string();
-                } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
-                                            FieldType::OLAP_FIELD_TYPE_DATEV2 
&&
-                                    
to_lower(tablet_schema.column(cur_cid).default_value())
-                                                    
.find(to_lower("CURRENT_DATE")) !=
-                                            std::string::npos)) {
-                    DateV2Value<DateV2ValueType> dv;
-                    dv.from_unixtime(timestamp_ms / 1000, timezone);
-                    default_value = dv.debug_string();
-                } else {
-                    default_value = 
tablet_schema.column(cur_cid).default_value();
-                }
-                default_values.emplace_back(default_value);
-            } else {
-                // place an empty string here
-                default_values.emplace_back();
-            }
-        }
-        CHECK_EQ(missing_cids.size(), default_values.size());
-    }
+    void _generate_default_values_for_missing_cids(const TabletSchema& 
tablet_schema);
 
 public:
     bool is_partial_update {false};
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp 
b/be/src/olap/rowset/rowset_meta_manager.cpp
index 38911327d84..d89be5ab8ec 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -535,4 +535,98 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* 
meta,
     return status;
 }
 
+Status RowsetMetaManager::save_partial_update_info(
+        OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t 
txn_id,
+        const PartialUpdateInfoPB& partial_update_info_pb) {
+    std::string key =
+            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, 
partition_id, txn_id);
+    std::string value;
+    if (!partial_update_info_pb.SerializeToString(&value)) {
+        return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
+                "serialize partial update info failed. key={}", key);
+    }
+    VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" 
<< value.size();
+    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
+}
+
+Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t 
tablet_id,
+                                                      int64_t partition_id, 
int64_t txn_id,
+                                                      PartialUpdateInfoPB* 
partial_update_info_pb) {
+    std::string key =
+            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, 
partition_id, txn_id);
+    std::string value;
+    Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
+    if (status.is<META_KEY_NOT_FOUND>()) {
+        return status;
+    }
+    if (!status.ok()) {
+        LOG_WARNING("failed to get partial update info. tablet_id={}, 
partition_id={}, txn_id={}",
+                    tablet_id, partition_id, txn_id);
+        return status;
+    }
+    if (!partial_update_info_pb->ParseFromString(value)) {
+        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
+                "fail to parse partial update info content to protobuf object. 
tablet_id={}, "
+                "partition_id={}, txn_id={}",
+                tablet_id, partition_id, txn_id);
+    }
+    return Status::OK();
+}
+
+Status RowsetMetaManager::traverse_partial_update_info(
+        OlapMeta* meta,
+        std::function<bool(int64_t, int64_t, int64_t, std::string_view)> 
const& func) {
+    auto traverse_partial_update_info_func = [&func](const std::string& key,
+                                                     const std::string& value) 
-> bool {
+        std::vector<std::string> parts;
+        // key format: pui_{tablet_id}_{partition_id}_{txn_id}
+        RETURN_IF_ERROR(split_string(key, '_', &parts));
+        if (parts.size() != 4) {
+            LOG_WARNING("invalid rowset key={}, splitted size={}", key, 
parts.size());
+            return true;
+        }
+        int64_t tablet_id = std::stoll(parts[1]);
+        int64_t partition_id = std::stoll(parts[2]);
+        int64_t txn_id = std::stoll(parts[3]);
+        return func(tablet_id, partition_id, txn_id, value);
+    };
+    return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
+                         traverse_partial_update_info_func);
+}
+
+Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t 
tablet_id,
+                                                     int64_t partition_id, 
int64_t txn_id) {
+    std::string key =
+            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, 
partition_id, txn_id);
+    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
+    VLOG_NOTICE << "remove partial update info, key=" << key;
+    return res;
+}
+
+Status RowsetMetaManager::remove_partial_update_infos(
+        OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, 
int64_t>>& keys) {
+    std::vector<std::string> remove_keys;
+    for (auto [tablet_id, partition_id, txn_id] : keys) {
+        remove_keys.push_back(fmt::format("{}{}_{}_{}", 
PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
+                                          partition_id, txn_id));
+    }
+    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
+    VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << 
remove_keys.size();
+    return res;
+}
+
+Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* 
meta,
+                                                                    int64_t 
tablet_id) {
+    std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, 
tablet_id);
+    std::vector<std::string> remove_keys;
+    auto get_remove_keys_func = [&](const std::string& key, const std::string& 
value) -> bool {
+        remove_keys.emplace_back(key);
+        return true;
+    };
+    VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << 
tablet_id
+                << " removed keys size: " << remove_keys.size();
+    RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, 
get_remove_keys_func));
+    return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_meta_manager.h 
b/be/src/olap/rowset/rowset_meta_manager.h
index 9517ce3f51a..0cfbb3383e3 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -18,6 +18,8 @@
 #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
 #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
 
+#include <gen_cpp/olap_file.pb.h>
+
 #include <cstdint>
 #include <functional>
 #include <string>
@@ -32,11 +34,15 @@
 namespace doris {
 class OlapMeta;
 class RowsetMetaPB;
+class PartialUpdateInfoPB;
 } // namespace doris
 
 namespace doris {
 namespace {
 const std::string ROWSET_PREFIX = "rst_";
+
+const std::string PARTIAL_UPDATE_INFO_PREFIX = "pui_";
+
 } // namespace
 
 // Helper class for managing rowset meta of one root path.
@@ -80,6 +86,21 @@ public:
 
     static Status load_json_rowset_meta(OlapMeta* meta, const std::string& 
rowset_meta_path);
 
+    static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id, 
int64_t partition_id,
+                                           int64_t txn_id,
+                                           const PartialUpdateInfoPB& 
partial_update_info_pb);
+    static Status try_get_partial_update_info(OlapMeta* meta, int64_t 
tablet_id,
+                                              int64_t partition_id, int64_t 
txn_id,
+                                              PartialUpdateInfoPB* 
partial_update_info_pb);
+    static Status traverse_partial_update_info(
+            OlapMeta* meta,
+            std::function<bool(int64_t, int64_t, int64_t, std::string_view)> 
const& func);
+    static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
+                                             int64_t partition_id, int64_t 
txn_id);
+    static Status remove_partial_update_infos(
+            OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, 
int64_t>>& keys);
+    static Status remove_tablet_related_partial_update_info(OlapMeta* meta, 
int64_t tablet_id);
+
 private:
     static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& 
rowset_id,
                         const RowsetMetaPB& rowset_meta_pb);
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 32bbdb246a3..4194d3ae6c3 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -40,6 +40,7 @@
 #include "olap/rowset/beta_rowset_writer.h"
 #include "olap/rowset/pending_rowset_helper.h"
 #include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_meta_manager.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/schema_change.h"
@@ -325,10 +326,11 @@ Status RowsetBuilder::commit_txn() {
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
         
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
+
     // Transfer ownership of `PendingRowsetGuard` to `TxnManager`
-    Status res = _engine.txn_manager()->commit_txn(_req.partition_id, 
*tablet(), _req.txn_id,
-                                                   _req.load_id, _rowset,
-                                                   
std::move(_pending_rs_guard), false);
+    Status res = _engine.txn_manager()->commit_txn(
+            _req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset,
+            std::move(_pending_rs_guard), false, _partial_update_info);
 
     if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
         LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 82c07a59152..f4b11b8fb62 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -63,6 +63,7 @@
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/olap_meta.h"
+#include "olap/rowset/rowset_fwd.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/rowset/unique_rowset_id_generator.h"
@@ -807,6 +808,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool 
ignore_guard) {
     // cleand unused pending publish info for deleted tablet
     _clean_unused_pending_publish_info();
 
+    // clean unused partial update info for finished txns
+    _clean_unused_partial_update_info();
+
     // clean unused rowsets in remote storage backends
     for (auto data_dir : get_stores()) {
         data_dir->perform_remote_rowset_gc();
@@ -970,6 +974,34 @@ void StorageEngine::_clean_unused_pending_publish_info() {
     }
 }
 
+void StorageEngine::_clean_unused_partial_update_info() {
+    std::vector<std::tuple<int64_t, int64_t, int64_t>> remove_infos;
+    auto unused_partial_update_info_collector =
+            [this, &remove_infos](int64_t tablet_id, int64_t partition_id, 
int64_t txn_id,
+                                  std::string_view value) -> bool {
+        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
+        if (tablet == nullptr) {
+            remove_infos.emplace_back(tablet_id, partition_id, txn_id);
+            return true;
+        }
+        TxnState txn_state =
+                _txn_manager->get_txn_state(partition_id, txn_id, tablet_id, 
tablet->tablet_uid());
+        if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED 
||
+            txn_state == TxnState::DELETED) {
+            remove_infos.emplace_back(tablet_id, partition_id, txn_id);
+            return true;
+        }
+        return true;
+    };
+    auto data_dirs = get_stores();
+    for (auto* data_dir : data_dirs) {
+        static_cast<void>(RowsetMetaManager::traverse_partial_update_info(
+                data_dir->get_meta(), unused_partial_update_info_collector));
+        static_cast<void>(
+                
RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(), 
remove_infos));
+    }
+}
+
 void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& 
gc_tablet_infos) {
     for (auto [tablet_id, version] : gc_tablet_infos) {
         LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, 
version: {}", tablet_id,
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index f647869e825..5562257133c 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -255,6 +255,8 @@ private:
 
     void _clean_unused_pending_publish_info();
 
+    void _clean_unused_partial_update_info();
+
     Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
                      const int32_t expire);
 
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 2ed1ac5674d..373c398df61 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -33,9 +33,11 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/status.h"
 #include "olap/data_dir.h"
 #include "olap/delta_writer.h"
 #include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
 #include "olap/rowset/pending_rowset_helper.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_meta_manager.h"
@@ -173,10 +175,11 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, 
TTransactionId transac
 Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet,
                               TTransactionId transaction_id, const PUniqueId& 
load_id,
                               const RowsetSharedPtr& rowset_ptr, 
PendingRowsetGuard guard,
-                              bool is_recovery) {
+                              bool is_recovery,
+                              std::shared_ptr<PartialUpdateInfo> 
partial_update_info) {
     return commit_txn(tablet.data_dir()->get_meta(), partition_id, 
transaction_id,
                       tablet.tablet_id(), tablet.tablet_uid(), load_id, 
rowset_ptr,
-                      std::move(guard), is_recovery);
+                      std::move(guard), is_recovery, partial_update_info);
 }
 
 Status TxnManager::publish_txn(TPartitionId partition_id, const 
TabletSharedPtr& tablet,
@@ -259,7 +262,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
                               TTransactionId transaction_id, TTabletId 
tablet_id,
                               TabletUid tablet_uid, const PUniqueId& load_id,
                               const RowsetSharedPtr& rowset_ptr, 
PendingRowsetGuard guard,
-                              bool is_recovery) {
+                              bool is_recovery,
+                              std::shared_ptr<PartialUpdateInfo> 
partial_update_info) {
     if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
         LOG(WARNING) << "invalid commit req "
                      << " partition_id=" << partition_id << " transaction_id=" 
<< transaction_id
@@ -369,6 +373,36 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
             save_status.append(fmt::format(", txn id: {}", transaction_id));
             return save_status;
         }
+
+        if (partial_update_info && partial_update_info->is_partial_update) {
+            PartialUpdateInfoPB partial_update_info_pb;
+            partial_update_info->to_pb(&partial_update_info_pb);
+            save_status = RowsetMetaManager::save_partial_update_info(
+                    meta, tablet_id, partition_id, transaction_id, 
partial_update_info_pb);
+            if (!save_status.ok()) {
+                save_status.append(fmt::format(", txn_id: {}", 
transaction_id));
+                return save_status;
+            }
+        }
+    }
+
+    TabletSharedPtr tablet;
+    std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
+    if (is_recovery) {
+        tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid);
+        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
+            PartialUpdateInfoPB partial_update_info_pb;
+            auto st = RowsetMetaManager::try_get_partial_update_info(
+                    meta, tablet_id, partition_id, transaction_id, 
&partial_update_info_pb);
+            if (st.ok()) {
+                decoded_partial_update_info = 
std::make_shared<PartialUpdateInfo>();
+                decoded_partial_update_info->from_pb(&partial_update_info_pb);
+                DCHECK(decoded_partial_update_info->is_partial_update);
+            } else if (!st.is<META_KEY_NOT_FOUND>()) {
+                // the load is not a partial update
+                return st;
+            }
+        }
     }
 
     {
@@ -376,11 +410,17 @@ Status TxnManager::commit_txn(OlapMeta* meta, 
TPartitionId partition_id,
         auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr);
         load_info->pending_rs_guard = std::move(guard);
         if (is_recovery) {
-            TabletSharedPtr tablet = 
_engine.tablet_manager()->get_tablet(tablet_info.tablet_id,
-                                                                          
tablet_info.tablet_uid);
             if (tablet != nullptr && 
tablet->enable_unique_key_merge_on_write()) {
                 load_info->unique_key_merge_on_write = true;
                 load_info->delete_bitmap.reset(new 
DeleteBitmap(tablet->tablet_id()));
+                if (decoded_partial_update_info) {
+                    LOG_INFO(
+                            "get partial update info from RocksDB during 
recovery. txn_id={}, "
+                            "partition_id={}, tablet_id={}, 
partial_update_info=[{}]",
+                            transaction_id, partition_id, tablet_id,
+                            decoded_partial_update_info->summary());
+                    load_info->partial_update_info = 
decoded_partial_update_info;
+                }
             }
         }
         load_info->commit();
@@ -513,6 +553,20 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
         return status;
     }
 
+    if (tablet_txn_info->unique_key_merge_on_write && 
tablet_txn_info->partial_update_info &&
+        tablet_txn_info->partial_update_info->is_partial_update) {
+        status = RowsetMetaManager::remove_partial_update_info(meta, 
tablet_id, partition_id,
+                                                               transaction_id);
+        if (!status) {
+            // discard the error status and print the warning log
+            LOG_WARNING(
+                    "fail to remove partial update info from RocksDB. 
txn_id={}, rowset_id={}, "
+                    "tablet_id={}, tablet_uid={}",
+                    transaction_id, rowset->rowset_id().to_string(), tablet_id,
+                    tablet_uid.to_string());
+        }
+    }
+
     // TODO(Drogon): remove these test codes
     if (enable_binlog) {
         auto version_str = fmt::format("{}", version.first);
@@ -692,6 +746,13 @@ void 
TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
             }
         }
     }
+    if (meta != nullptr) {
+        Status st = 
RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
+        if (!st.ok()) {
+            LOG_WARNING("failed to partial update info, tablet_id={}, err={}", 
tablet_id,
+                        st.to_string());
+        }
+    }
 }
 
 void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 431ce6e49cf..ab34113c7e7 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -36,7 +36,6 @@
 
 #include "common/status.h"
 #include "olap/olap_common.h"
-#include "olap/partial_update_info.h"
 #include "olap/rowset/pending_rowset_helper.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_meta.h"
@@ -52,6 +51,7 @@ namespace doris {
 class DeltaWriter;
 class OlapMeta;
 struct TabletPublishStatistics;
+struct PartialUpdateInfo;
 
 enum class TxnState {
     NOT_FOUND = 0,
@@ -143,8 +143,8 @@ public:
 
     Status commit_txn(TPartitionId partition_id, const Tablet& tablet,
                       TTransactionId transaction_id, const PUniqueId& load_id,
-                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard 
guard,
-                      bool is_recovery);
+                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard 
guard, bool is_recovery,
+                      std::shared_ptr<PartialUpdateInfo> partial_update_info = 
nullptr);
 
     Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& 
tablet,
                        TTransactionId transaction_id, const Version& version,
@@ -159,8 +159,8 @@ public:
 
     Status commit_txn(OlapMeta* meta, TPartitionId partition_id, 
TTransactionId transaction_id,
                       TTabletId tablet_id, TabletUid tablet_uid, const 
PUniqueId& load_id,
-                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard 
guard,
-                      bool is_recovery);
+                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard 
guard, bool is_recovery,
+                      std::shared_ptr<PartialUpdateInfo> partial_update_info = 
nullptr);
 
     // remove a txn from txn manager
     // not persist rowset meta because
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 4d0d3707132..55aee8f07d2 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -391,3 +391,18 @@ message RowsetBinlogMetasPB {
 
     repeated RowsetBinlogMetaPB rowset_binlog_metas = 1;
 }
+
+message PartialUpdateInfoPB {
+    optional bool is_partial_update = 1 [default = false];
+    repeated string partial_update_input_columns = 2;
+    repeated uint32 missing_cids = 3;
+    repeated uint32 update_cids = 4;
+    optional bool can_insert_new_rows_in_partial_update = 5 [default = false];
+    optional bool is_strict_mode = 6 [default = false];
+    optional int64 timestamp_ms = 7 [default = 0];
+    optional string timezone = 8;
+    optional bool is_input_columns_contains_auto_inc_column = 9 [default = 
false];
+    optional bool is_schema_contains_auto_inc_column = 10 [default = false];
+    repeated string default_values = 11;
+    optional int64 max_version_in_flush_phase = 12 [default = -1];
+}
diff --git a/regression-test/data/unique_with_mow_p0/partial_update/data1.csv 
b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
new file mode 100644
index 00000000000..be9f2feb69a
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
@@ -0,0 +1,2 @@
+1,10,10
+3,30,30
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
new file mode 100644
index 00000000000..6444b41c2c2
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+
+-- !sql --
+1      1       1       99      99
+2      2       2       88      88
+3      3       3       77      77
+
+-- !sql --
+1      10      10      99      99
+2      2       2       88      88
+3      30      30      77      77
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
new file mode 100644
index 00000000000..bc2a44425b3
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
@@ -0,0 +1,156 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_partial_update_conflict_be_restart") {
+    def dbName = context.config.getDbNameByFile(context.file)
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = false
+    docker(options) {
+        def table1 = "test_partial_update_conflict_be_restart"
+        sql "DROP TABLE IF EXISTS ${table1};"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int,
+                `c4` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1"); """
+
+        sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
+        order_qt_sql "select * from ${table1};"
+
+        def do_streamload_2pc_commit = { txnId ->
+            def feNode = sql_return_maparray("show frontends;").get(0)
+            def command = "curl -X PUT --location-trusted -u root:" +
+                    " -H txn_id:${txnId}" +
+                    " -H txn_operation:commit" +
+                    " 
http://${feNode.Host}:${feNode.HttpPort}/api/${dbName}/${table1}/_stream_load_2pc";
+            log.info("http_stream execute 2pc: ${command}")
+
+            def process = command.execute()
+            code = process.waitFor()
+            out = process.text
+            json2pc = parseJson(out)
+            log.info("http_stream 2pc result: ${out}".toString())
+            assertEquals(code, 0)
+            assertEquals("success", json2pc.status.toLowerCase())
+        }
+
+        def wait_for_publish = {txnId, waitSecond ->
+            String st = "PREPARE"
+            while (!st.equalsIgnoreCase("VISIBLE") && 
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+                Thread.sleep(1000)
+                waitSecond -= 1
+                def result = sql_return_maparray "show transaction from 
${dbName} where id = ${txnId}"
+                assertNotNull(result)
+                st = result[0].TransactionStatus
+            }
+            log.info("Stream load with txn ${txnId} is ${st}")
+            assertEquals(st, "VISIBLE")
+        }
+
+        String txnId1
+        streamLoad {
+            table "${table1}"
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'k1,c1,c2'
+            set 'strict_mode', "false"
+            set 'two_phase_commit', 'true'
+            file 'data1.csv'
+            time 10000 // limit inflight 10s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                txnId1 = json.TxnId
+                assertEquals("success", json.Status.toLowerCase())
+            }
+        }
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+
+        // another partial update that conflicts with the previous load and 
publishes successfully
+        sql "set enable_unique_key_partial_update=true;"
+        sql "sync;"
+        sql "insert into ${table1}(k1,c3,c4) values(1, 99, 
99),(2,88,88),(3,77,77);"
+        sql "set enable_unique_key_partial_update=false;"
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+
+        // restart backend
+        cluster.restartBackends()
+        Thread.sleep(5000)
+
+        // wait for be restart
+        boolean ok = false
+        int cnt = 0
+        for (; cnt < 10; cnt++) {
+            def be = sql_return_maparray("show backends").get(0)
+            if (be.Alive.toBoolean()) {
+                ok = true
+                break;
+            }
+            logger.info("wait for BE restart...")
+            Thread.sleep(1000)
+        }
+        if (!ok) {
+            logger.info("BE failed to restart")
+            assertTrue(false)
+        }
+
+        Thread.sleep(5000)
+
+        do_streamload_2pc_commit(txnId1)
+        wait_for_publish(txnId1, 10)
+
+
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+        sql "DROP TABLE IF EXISTS ${table1};"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to