This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 42425808a10 [Cherry-Pick](branch-2.1) Pick "Fix multiple replica 
partial update auto inc data inconsistency problem #34788" (#35056)
42425808a10 is described below

commit 42425808a10c508a85925a178525672c9c8f8558
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Mon May 20 15:43:46 2024 +0800

    [Cherry-Pick](branch-2.1) Pick "Fix multiple replica partial update auto 
inc data inconsistency problem #34788" (#35056)
    
    * [Fix](auto inc) Fix multiple replica partial update auto inc data 
inconsistency problem (#34788)
    
    * **Problem:** For tables with auto-increment columns, updating partial 
columns can cause data inconsistency among replicas.
    
    **Cause:** Previously, the implementation for updating partial columns in 
tables with auto-increment columns was done independently on each BE (Backend), 
leading to potential inconsistencies in the auto-increment column values 
generated by each BE.
    
    **Solution:** Before distributing blocks, determine if the update involves 
partial columns of a table with an auto-increment column. If so, add the 
auto-increment column to the last column of the block. After distributing to 
each BE, each BE will check if the data key for the partial column update 
exists. If it exists, the previous auto-increment column value is used; if not, 
the auto-increment column value from the last column of the block is used. This 
ensures that the auto-incremen [...]
    
    * 2
    
    * [Fix](regression-test) Fix auto inc partial update unstable regression 
test (#34940)
---
 be/src/exec/tablet_info.cpp                        |   3 +
 be/src/exec/tablet_info.h                          |   2 +
 be/src/olap/delta_writer_v2.cpp                    |   3 +-
 be/src/olap/memtable.cpp                           |  13 +-
 be/src/olap/memtable.h                             |   2 +
 be/src/olap/partial_update_info.h                  |  10 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  30 +--
 be/src/olap/rowset/segment_v2/segment_writer.h     |   7 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  27 +--
 .../rowset/segment_v2/vertical_segment_writer.h    |   6 +-
 be/src/olap/rowset_builder.cpp                     |   3 +-
 be/src/runtime/runtime_state.h                     |   7 +
 be/src/vec/sink/vtablet_block_convertor.cpp        |  49 +++-
 be/src/vec/sink/vtablet_block_convertor.h          |   7 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   6 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   6 +-
 .../org/apache/doris/planner/OlapTableSink.java    |   1 +
 gensrc/proto/descriptors.proto                     |   1 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 ..._auto_inc_partial_update_consistency_insert.out |  97 ++++++++
 ..._inc_partial_update_consistency_stream_load.out |  95 ++++++++
 ...to_inc_partial_update_consistency_insert.groovy | 221 ++++++++++++++++++
 ...c_partial_update_consistency_stream_load.groovy | 255 +++++++++++++++++++++
 23 files changed, 788 insertions(+), 64 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index ff8c272fb22..62ff0b2fcce 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -121,6 +121,7 @@ Status OlapTableSchemaParam::init(const 
POlapTableSchemaParam& pschema) {
     _is_strict_mode = pschema.is_strict_mode();
     if (_is_partial_update) {
         _auto_increment_column = pschema.auto_increment_column();
+        _auto_increment_column_unique_id = 
pschema.auto_increment_column_unique_id();
     }
     _timestamp_ms = pschema.timestamp_ms();
     _timezone = pschema.timezone();
@@ -186,6 +187,7 @@ Status OlapTableSchemaParam::init(const 
TOlapTableSchemaParam& tschema) {
     }
     if (_is_partial_update) {
         _auto_increment_column = tschema.auto_increment_column;
+        _auto_increment_column_unique_id = 
tschema.auto_increment_column_unique_id;
     }
 
     for (const auto& tcolumn : tschema.partial_update_input_columns) {
@@ -258,6 +260,7 @@ void 
OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
     pschema->set_partial_update(_is_partial_update);
     pschema->set_is_strict_mode(_is_strict_mode);
     pschema->set_auto_increment_column(_auto_increment_column);
+    
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
     pschema->set_timestamp_ms(_timestamp_ms);
     pschema->set_timezone(_timezone);
     for (auto col : _partial_update_input_columns) {
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 20f4fa51fc6..fcba8fd8262 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -93,6 +93,7 @@ public:
         return _partial_update_input_columns;
     }
     std::string auto_increment_coulumn() const { return 
_auto_increment_column; }
+    int32_t auto_increment_column_unique_id() const { return 
_auto_increment_column_unique_id; }
     void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = 
timestamp_ms; }
     int64_t timestamp_ms() const { return _timestamp_ms; }
     void set_timezone(std::string timezone) { _timezone = timezone; }
@@ -113,6 +114,7 @@ private:
     std::set<std::string> _partial_update_input_columns;
     bool _is_strict_mode = false;
     std::string _auto_increment_column;
+    int32_t _auto_increment_column_unique_id;
     int64_t _timestamp_ms = 0;
     std::string _timezone;
 };
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 51cef7e9f58..5cfc260d1b5 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -238,7 +238,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t 
index_id,
     _partial_update_info->init(*_tablet_schema, 
table_schema_param->is_partial_update(),
                                
table_schema_param->partial_update_input_columns(),
                                table_schema_param->is_strict_mode(),
-                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone());
+                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone(),
+                               table_schema_param->auto_increment_coulumn());
 }
 
 } // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 123eb7d8264..2676bf7a32e 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -68,16 +68,22 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* 
tablet_schema,
     _query_thread_context.init();
     _arena = std::make_unique<vectorized::Arena>();
     _vec_row_comparator = 
std::make_shared<RowInBlockComparator>(_tablet_schema);
-    // TODO: Support ZOrderComparator in the future
-    _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     _num_columns = _tablet_schema->num_columns();
     if (partial_update_info != nullptr) {
         _is_partial_update = partial_update_info->is_partial_update;
         if (_is_partial_update) {
             _num_columns = 
partial_update_info->partial_update_input_columns.size();
+            if (partial_update_info->is_schema_contains_auto_inc_column &&
+                
!partial_update_info->is_input_columns_contains_auto_inc_column) {
+                _is_partial_update_and_auto_inc = true;
+                _num_columns += 1;
+            }
         }
     }
+    // TODO: Support ZOrderComparator in the future
+    _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
 }
+
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
                                                   const TupleDescriptor* 
tuple_desc) {
     for (auto slot_desc : *slot_descs) {
@@ -89,6 +95,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescript
             }
         }
     }
+    if (_is_partial_update_and_auto_inc) {
+        _column_offset.emplace_back(_column_offset.size());
+    }
 }
 
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c95e38fb05a..8362c69222e 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -265,6 +265,8 @@ private:
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;
+
+    bool _is_partial_update_and_auto_inc = false;
 }; // class MemTable
 
 } // namespace doris
diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index 7a22ecf5035..e08021b4f38 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -24,7 +24,8 @@ namespace doris {
 struct PartialUpdateInfo {
     void init(const TabletSchema& tablet_schema, bool partial_update,
               const std::set<string>& partial_update_cols, bool is_strict_mode,
-              int64_t timestamp_ms, const std::string& timezone) {
+              int64_t timestamp_ms, const std::string& timezone,
+              const std::string& auto_increment_column) {
         is_partial_update = partial_update;
         partial_update_input_columns = partial_update_cols;
         this->timestamp_ms = timestamp_ms;
@@ -42,8 +43,13 @@ struct PartialUpdateInfo {
             } else {
                 update_cids.emplace_back(i);
             }
+            if (auto_increment_column == tablet_column.name()) {
+                is_schema_contains_auto_inc_column = true;
+            }
         }
         this->is_strict_mode = is_strict_mode;
+        is_input_columns_contains_auto_inc_column =
+                is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
     }
 
     bool is_partial_update {false};
@@ -56,5 +62,7 @@ struct PartialUpdateInfo {
     bool is_strict_mode {false};
     int64_t timestamp_ms {0};
     std::string timezone;
+    bool is_input_columns_contains_auto_inc_column = false;
+    bool is_schema_contains_auto_inc_column = false;
 };
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index b2543568478..83e93631ab1 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -112,13 +112,6 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
             const auto& column = 
_tablet_schema->column(_tablet_schema->sequence_col_idx());
             _seq_coder = get_key_coder(column.type());
         }
-        if (!_tablet_schema->auto_increment_column().empty()) {
-            _auto_inc_id_buffer =
-                    
vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
-                            _tablet_schema->db_id(), 
_tablet_schema->table_id(),
-                            
_tablet_schema->column(_tablet_schema->auto_increment_column())
-                                    .unique_id());
-        }
         // encode the rowid into the primary key index
         if (!_tablet_schema->cluster_key_idxes().empty()) {
             const auto* type_info = 
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
@@ -559,7 +552,7 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     // read and fill block
     auto mutable_full_columns = full_block.mutate_columns();
     RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, 
use_default_or_null_flag,
-                                         has_default_or_nullable, 
segment_start_pos));
+                                         has_default_or_nullable, 
segment_start_pos, block));
     full_block.set_columns(std::move(mutable_full_columns));
     // row column should be filled here
     if (_tablet_schema->store_row_column()) {
@@ -618,7 +611,8 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
 Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                            const std::vector<bool>& 
use_default_or_null_flag,
                                            bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
+                                           const size_t& segment_start_pos,
+                                           const vectorized::Block* block) {
     if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
         // TODO(plat1ko): cloud mode
         return Status::NotSupported("fill_missing_columns");
@@ -712,18 +706,6 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
         }
     }
 
-    // deal with partial update auto increment column when there no key in old 
block.
-    if (!_tablet_schema->auto_increment_column().empty()) {
-        if (_auto_inc_id_allocator.total_count < 
use_default_or_null_flag.size()) {
-            std::vector<std::pair<int64_t, size_t>> res;
-            RETURN_IF_ERROR(
-                    
_auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res));
-            for (auto [start, length] : res) {
-                _auto_inc_id_allocator.insert_ids(start, length);
-            }
-        }
-    }
-
     // fill all missing value from mutable_old_columns, need to consider 
default value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
         // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
@@ -751,7 +733,11 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
                            FieldType::OLAP_FIELD_TYPE_BIGINT);
                     auto auto_inc_column = 
assert_cast<vectorized::ColumnInt64*>(
                             mutable_full_columns[cids_missing[i]].get());
-                    auto_inc_column->insert(_auto_inc_id_allocator.next_id());
+                    auto_inc_column->insert(
+                            (assert_cast<const vectorized::ColumnInt64*>(
+                                     
block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__")
+                                             .column.get()))
+                                    ->get_element(idx));
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 1adb94aad21..2f26d6158ee 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -40,7 +40,6 @@
 #include "olap/tablet_schema.h"
 #include "util/faststring.h"
 #include "util/slice.h"
-#include "vec/sink/autoinc_buffer.h"
 
 namespace doris {
 namespace vectorized {
@@ -130,7 +129,8 @@ public:
     void set_mow_context(std::shared_ptr<MowContext> mow_context);
     Status fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                 const std::vector<bool>& 
use_default_or_null_flag,
-                                bool has_default_or_nullable, const size_t& 
segment_start_pos);
+                                bool has_default_or_nullable, const size_t& 
segment_start_pos,
+                                const vectorized::Block* block);
 
 private:
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
@@ -226,9 +226,6 @@ private:
     // group every rowset-segment row id to speed up reader
     PartialUpdateReadPlan _rssid_to_rid;
     std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
-
-    std::shared_ptr<vectorized::AutoIncIDBuffer> _auto_inc_id_buffer = nullptr;
-    vectorized::AutoIncIDAllocator _auto_inc_id_allocator;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5cadc9aac6e..5d2ddedb204 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -103,11 +103,6 @@ 
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
         const auto& column = 
_tablet_schema->column(_tablet_schema->sequence_col_idx());
         _seq_coder = get_key_coder(column.type());
     }
-    if (!_tablet_schema->auto_increment_column().empty()) {
-        _auto_inc_id_buffer = 
vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
-                _tablet_schema->db_id(), _tablet_schema->table_id(),
-                
_tablet_schema->column(_tablet_schema->auto_increment_column()).unique_id());
-    }
     if (_tablet_schema->has_inverted_index()) {
         _inverted_index_file_writer = 
std::make_unique<InvertedIndexFileWriter>(
                 _file_writer->fs(), _file_writer->path().parent_path(),
@@ -493,7 +488,7 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
     // read and fill block
     auto mutable_full_columns = full_block.mutate_columns();
     RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, 
use_default_or_null_flag,
-                                          has_default_or_nullable, 
segment_start_pos));
+                                          has_default_or_nullable, 
segment_start_pos, data.block));
     // row column should be filled here
     if (_tablet_schema->store_row_column()) {
         // convert block to row store format
@@ -552,7 +547,7 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
 Status VerticalSegmentWriter::_fill_missing_columns(
         vectorized::MutableColumns& mutable_full_columns,
         const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
-        const size_t& segment_start_pos) {
+        const size_t& segment_start_pos, const vectorized::Block* block) {
     if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
         // TODO(plat1ko): CloudStorageEngine
         return Status::NotSupported("fill_missing_columns");
@@ -645,18 +640,6 @@ Status VerticalSegmentWriter::_fill_missing_columns(
         }
     }
 
-    // deal with partial update auto increment column when there no key in old 
block.
-    if (!_tablet_schema->auto_increment_column().empty()) {
-        if (_auto_inc_id_allocator.total_count < 
use_default_or_null_flag.size()) {
-            std::vector<std::pair<int64_t, size_t>> res;
-            RETURN_IF_ERROR(
-                    
_auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res));
-            for (auto [start, length] : res) {
-                _auto_inc_id_allocator.insert_ids(start, length);
-            }
-        }
-    }
-
     // fill all missing value from mutable_old_columns, need to consider 
default value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
         // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
@@ -684,7 +667,11 @@ Status VerticalSegmentWriter::_fill_missing_columns(
                            FieldType::OLAP_FIELD_TYPE_BIGINT);
                     auto auto_inc_column = 
assert_cast<vectorized::ColumnInt64*>(
                             mutable_full_columns[missing_cids[i]].get());
-                    auto_inc_column->insert(_auto_inc_id_allocator.next_id());
+                    auto_inc_column->insert(
+                            (assert_cast<const vectorized::ColumnInt64*>(
+                                     
block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__")
+                                             .column.get()))
+                                    ->get_element(idx));
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index 7bc6bf7c4fa..02e7170ff51 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -38,7 +38,6 @@
 #include "olap/tablet_schema.h"
 #include "util/faststring.h"
 #include "util/slice.h"
-#include "vec/sink/autoinc_buffer.h"
 
 namespace doris {
 namespace vectorized {
@@ -145,7 +144,8 @@ private:
     Status _append_block_with_partial_content(RowsInBlock& data);
     Status _fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                  const std::vector<bool>& 
use_default_or_null_flag,
-                                 bool has_default_or_nullable, const size_t& 
segment_start_pos);
+                                 bool has_default_or_nullable, const size_t& 
segment_start_pos,
+                                 const vectorized::Block* block);
 
 private:
     uint32_t _segment_id;
@@ -193,8 +193,6 @@ private:
     std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
 
     std::vector<RowsInBlock> _batched_blocks;
-    std::shared_ptr<vectorized::AutoIncIDBuffer> _auto_inc_id_buffer = nullptr;
-    vectorized::AutoIncIDAllocator _auto_inc_id_allocator;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 4355bb6f964..2153a9ad1a8 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -377,7 +377,8 @@ void 
BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
     _partial_update_info->init(*_tablet_schema, 
table_schema_param->is_partial_update(),
                                
table_schema_param->partial_update_input_columns(),
                                table_schema_param->is_strict_mode(),
-                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone());
+                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone(),
+                               table_schema_param->auto_increment_coulumn());
 }
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index abef8615b1e..9f483fdc26e 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -42,6 +42,7 @@
 #include "runtime/task_execution_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
+#include "vec/columns/columns_number.h"
 
 namespace doris {
 class IRuntimeFilter;
@@ -628,6 +629,10 @@ public:
 
     int task_num() const { return _task_num; }
 
+    vectorized::ColumnInt64* partial_update_auto_inc_column() {
+        return _partial_update_auto_inc_column;
+    };
+
 private:
     Status create_error_log_file();
 
@@ -755,6 +760,8 @@ private:
 
     // prohibit copies
     RuntimeState(const RuntimeState&);
+
+    vectorized::ColumnInt64* _partial_update_auto_inc_column;
 };
 
 #define RETURN_IF_CANCELLED(state)                                             
       \
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
index 678c899d980..d93a654728d 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -19,6 +19,7 @@
 
 #include <fmt/format.h>
 #include <gen_cpp/FrontendService.h>
+#include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
 
 #include <algorithm>
@@ -46,6 +47,7 @@
 #include "vec/common/assert_cast.h"
 #include "vec/core/block.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
@@ -66,8 +68,21 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
                 output_vexpr_ctxs, *input_block, block.get()));
     }
 
-    // fill the valus for auto-increment columns
-    if (_auto_inc_col_idx.has_value()) {
+    if (_is_partial_update_and_auto_inc) {
+        // If this load is partial update and this table has a auto inc column,
+        // e.g. table schema: k1, v1, v2(auto inc)
+        // 1. insert columns include auto inc column
+        // e.g. insert into table (k1, v2) value(a, 1);
+        // we do nothing.
+        // 2. insert columns do not include auto inc column
+        // e.g. insert into table (k1, v1) value(a, a);
+        // we need to fill auto_inc_cols by creating a new column.
+        if (!_auto_inc_col_idx.has_value()) {
+            RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), 
rows));
+        }
+    } else if (_auto_inc_col_idx.has_value()) {
+        // fill the valus for auto-increment columns
+        DCHECK_EQ(_is_partial_update_and_auto_inc, false);
         RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows));
     }
 
@@ -91,8 +106,16 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
     return Status::OK();
 }
 
-void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t 
table_id, int batch_size) {
+void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t 
table_id, int batch_size,
+                                                bool 
is_partial_update_and_auto_inc,
+                                                int32_t 
auto_increment_column_unique_id) {
     _batch_size = batch_size;
+    if (is_partial_update_and_auto_inc) {
+        _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc;
+        _auto_inc_id_buffer = 
GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
+                db_id, table_id, auto_increment_column_unique_id);
+        return;
+    }
     for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) {
         if (_output_tuple_desc->slots()[idx]->is_auto_increment()) {
             _auto_inc_col_idx = idx;
@@ -522,4 +545,24 @@ Status 
OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si
     return Status::OK();
 }
 
+Status 
OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* 
block,
+                                                                   size_t 
rows) {
+    auto dst_column = vectorized::ColumnInt64::create();
+    vectorized::ColumnInt64::Container& dst_values = dst_column->get_data();
+    size_t null_value_count = rows;
+    std::vector<std::pair<int64_t, size_t>> res;
+    RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, 
&res));
+    for (auto [start, length] : res) {
+        _auto_inc_id_allocator.insert_ids(start, length);
+    }
+
+    for (size_t i = 0; i < rows; i++) {
+        dst_values.emplace_back(_auto_inc_id_allocator.next_id());
+    }
+    block->insert(vectorized::ColumnWithTypeAndName(std::move(dst_column),
+                                                    
std::make_shared<DataTypeNumber<Int64>>(),
+                                                    
"__PARTIAL_UPDATE_AUTO_INC_COLUMN__"));
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_block_convertor.h 
b/be/src/vec/sink/vtablet_block_convertor.h
index 4eaaef3869c..0db340ce6c2 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -53,7 +53,9 @@ public:
 
     int64_t num_filtered_rows() const { return _num_filtered_rows; }
 
-    void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size);
+    void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size,
+                           bool is_partial_update_and_auto_inc = false,
+                           int32_t auto_increment_column_unique_id = -1);
 
     AutoIncIDAllocator& auto_inc_id_allocator() { return 
_auto_inc_id_allocator; }
 
@@ -82,6 +84,8 @@ private:
 
     Status _fill_auto_inc_cols(vectorized::Block* block, size_t rows);
 
+    Status _partial_update_fill_auto_inc_cols(vectorized::Block* block, size_t 
rows);
+
     TupleDescriptor* _output_tuple_desc = nullptr;
 
     std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
@@ -105,6 +109,7 @@ private:
     std::optional<size_t> _auto_inc_col_idx;
     std::shared_ptr<AutoIncIDBuffer> _auto_inc_id_buffer = nullptr;
     AutoIncIDAllocator _auto_inc_id_allocator;
+    bool _is_partial_update_and_auto_inc = false;
 };
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 3fb17850eff..0a7238e9f5d 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1178,8 +1178,10 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     }
 
     _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
-    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
-                                        _state->batch_size());
+    _block_convertor->init_autoinc_info(
+            _schema->db_id(), _schema->table_id(), _state->batch_size(),
+            _schema->is_partial_update() && 
!_schema->auto_increment_coulumn().empty(),
+            _schema->auto_increment_column_unique_id());
     _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
 
     // add all counter
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index b883d8e87c9..c1b43722c33 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -213,8 +213,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     }
 
     _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
-    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
-                                        _state->batch_size());
+    _block_convertor->init_autoinc_info(
+            _schema->db_id(), _schema->table_id(), _state->batch_size(),
+            _schema->is_partial_update() && 
!_schema->auto_increment_coulumn().empty(),
+            _schema->auto_increment_column_unique_id());
     _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
 
     // add all counter
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index d180c723398..ada7c6b770b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -312,6 +312,7 @@ public class OlapTableSink extends DataSink {
             for (Column col : table.getFullSchema()) {
                 if (col.isAutoInc()) {
                     schemaParam.setAutoIncrementColumn(col.getName());
+                    
schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId());
                 }
             }
         }
diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto
index 9d6945becc0..13c069f414f 100644
--- a/gensrc/proto/descriptors.proto
+++ b/gensrc/proto/descriptors.proto
@@ -72,5 +72,6 @@ message POlapTableSchemaParam {
     optional string auto_increment_column = 10;
     optional int64 timestamp_ms = 11 [default = 0];
     optional string timezone = 12;
+    optional int32 auto_increment_column_unique_id = 13;
 };
 
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index d82f74d771c..ef7a8451684 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -245,6 +245,7 @@ struct TOlapTableSchemaParam {
     9: optional list<string> partial_update_input_columns
     10: optional bool is_strict_mode = false
     11: optional string auto_increment_column
+    12: optional i32 auto_increment_column_unique_id = -1
 }
 
 struct TTabletLocation {
diff --git 
a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out
 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out
new file mode 100644
index 00000000000..79a08299118
--- /dev/null
+++ 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out
@@ -0,0 +1,97 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1_1 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select1_2 --
+
+-- !select1_3 --
+Alice  200
+Beata  723
+Bob    123
+Carter 523
+Doris  800
+Nereids        923
+Smith  600
+Test   400
+Tom    323
+
+-- !select1_4 --
+
+-- !select2_1 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select2_2 --
+
+-- !select2_3 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select2_4 --
+
+-- !select3_1 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select3_2 --
+
+-- !select3_3 --
+Alice  200
+Beata  9996
+Bob    9990
+Carter 9994
+Doris  800
+Nereids        9998
+Smith  600
+Test   400
+Tom    9992
+
+-- !select3_4 --
+
+-- !select3_5 --
+Alice  200
+BBBBeata       9996
+BBBBob 9990
+Beata  9996
+Bob    9990
+CCCCarter      9994
+Carter 9994
+Doris  800
+NNNNereids     9998
+Nereids        9998
+Smith  600
+TTTTom 9992
+Test   400
+Tom    9992
+
+-- !select3_6 --
+
diff --git 
a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out
 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out
new file mode 100644
index 00000000000..a31e438f292
--- /dev/null
+++ 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out
@@ -0,0 +1,95 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1_1 --
+1      Bob     100
+2      Alice   200
+3      Tom     300
+4      Test    400
+5      Carter  500
+6      Smith   600
+7      Beata   700
+8      Doris   800
+9      Nereids 900
+
+-- !select1_1 --
+Alice  200
+Beata  723
+Bob    123
+Carter 523
+Doris  800
+Nereids        923
+Smith  600
+Test   400
+Tom    323
+
+-- !select1_2 --
+
+-- !select2_1 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select2_2 --
+
+-- !select2_3 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select2_4 --
+
+-- !select3_1 --
+Alice  200
+Beata  700
+Bob    100
+Carter 500
+Doris  800
+Nereids        900
+Smith  600
+Test   400
+Tom    300
+
+-- !select3_2 --
+
+-- !select3_3 --
+Alice  200
+Beata  9996
+Bob    9990
+Carter 9994
+Doris  800
+Nereids        9998
+Smith  600
+Test   400
+Tom    9992
+
+-- !select3_4 --
+
+-- !select3_5 --
+Alice  200
+BBeata 9996
+BBob   9990
+Beata  9996
+Bob    9990
+CCarter        9994
+Carter 9994
+Doris  800
+NNereids       9998
+Nereids        9998
+Smith  600
+TTom   9992
+Test   400
+Tom    9992
+
+-- !select3_6 --
+
diff --git 
a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy
 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy
new file mode 100644
index 00000000000..4e2c8a5cbd8
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_unique_table_auto_inc_partial_update_correct_insert") {
+
+    def backends = sql_return_maparray('show backends')
+    def replicaNum = 0
+    def targetBackend = null
+    for (def be : backends) {
+        def alive = be.Alive.toBoolean()
+        def decommissioned = be.SystemDecommissioned.toBoolean()
+        if (alive && !decommissioned) {
+            replicaNum++
+            targetBackend = be
+        }
+    }
+    assertTrue(replicaNum > 0)
+
+    def check_data_correct = { def tableName ->
+        def old_result = sql "select id from ${tableName} order by id;"
+        logger.info("first result: " + old_result)
+        for (int i = 1; i<30; ++i){
+            def new_result = sql "select id from ${tableName} order by id;"
+            logger.info("new result: " + new_result)
+            for (int j = 0; j<old_result.size();++j){
+                if (old_result[j][0]!=new_result[j][0]){
+                    logger.info("table name: " + tableName)
+                    logger.info("old result: " + old_result)
+                    logger.info("new result: " + new_result)
+                    assertTrue(false)
+                }
+            }
+            old_result = new_result
+        }
+    }
+    
+    // test for partial update, auto inc col is key
+    def table1 = "unique_auto_inc_col_key_partial_update_insert"
+    sql "drop table if exists ${table1}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table1}` (
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    sql "insert into ${table1} (name, value) values 
('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)"
+
+    qt_select1_1 "select name, value from ${table1} order by name, value;"
+    qt_select1_2 "select id, count(*) from ${table1} group by id having 
count(*) > 1;"
+    check_data_correct(table1)
+    
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+
+    // 1, 123
+    // 3, 323
+    // 5, 523
+    // 7, 723
+    // 9, 923
+    sql "insert into ${table1} (id, value) values 
(1,123),(3,323),(5,523),(7,723),(9,923)"
+    qt_select1_3 "select name, value from ${table1} order by name, value;"
+    qt_select1_4 "select id, count(*) from ${table1} group by id having 
count(*) > 1;"
+    check_data_correct(table1)
+    sql "drop table if exists ${table1};"
+
+    // test for partial update, auto inc col is value, update auto inc col
+    def table2 = "unique_auto_inc_col_value_partial_update_insert"
+    sql "drop table if exists ${table2}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table2}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    sql "insert into ${table2} (name, value) values ('Bob',100)"
+    sql "insert into ${table2} (name, value) values ('Alice',200)"
+    sql "insert into ${table2} (name, value) values ('Tom',300)"
+    sql "insert into ${table2} (name, value) values ('Test',400)"
+    sql "insert into ${table2} (name, value) values ('Carter',500)"
+    sql "insert into ${table2} (name, value) values ('Smith',600)"
+    sql "insert into ${table2} (name, value) values ('Beata',700)"
+    sql "insert into ${table2} (name, value) values ('Doris',800)"
+    sql "insert into ${table2} (name, value) values ('Nereids',900)"
+    qt_select2_1 "select name, value from ${table2} order by name, value;"
+    qt_select2_2 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    sql "insert into ${table2} (name, id) values 
('Bob',9990),('Tom',9992),('Carter',9994),('Beata',9996),('Nereids',9998)"
+    qt_select2_3 "select name, value from ${table2} order by name, value;"
+    qt_select2_4 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+    sql "drop table if exists ${table2};"
+
+    // test for partial update, auto inc col is value, update other col
+    def table3 = "unique_auto_inc_col_value_partial_update_insert"
+    sql "drop table if exists ${table3}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table3}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    sql "set enable_unique_key_partial_update=false;"
+    sql "set enable_insert_strict=true;"
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    sql "insert into ${table2} (name, value) values 
('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)"
+    qt_select3_1 "select name, value from ${table3} order by name, value;"
+    qt_select3_2 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    sql "insert into ${table2} (name, value) values ('Bob',9990)"
+    sql "insert into ${table2} (name, value) values ('Tom',9992)"
+    sql "insert into ${table2} (name, value) values ('Carter',9994)"
+    sql "insert into ${table2} (name, value) values ('Beata',9996)"
+    sql "insert into ${table2} (name, value) values ('Nereids',9998)"
+    qt_select3_3 "select name, value from ${table3} order by name, value;"
+    qt_select3_4 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+
+    sql "insert into ${table2} (name, value) values ('BBBBob',9990)"
+    sql "insert into ${table2} (name, value) values ('TTTTom',9992)"
+    sql "insert into ${table2} (name, value) values ('CCCCarter',9994)"
+    sql "insert into ${table2} (name, value) values ('BBBBeata',9996)"
+    sql "insert into ${table2} (name, value) values ('NNNNereids',9998)"
+    qt_select3_5 "select name, value from ${table3} order by name, value;"
+    qt_select3_6 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+    sql "drop table if exists ${table3};"
+}
+
diff --git 
a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy
 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy
new file mode 100644
index 00000000000..474794deb9b
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_unique_table_auto_inc_partial_update_correct_stream_load") {
+
+    def backends = sql_return_maparray('show backends')
+    def replicaNum = 0
+    def targetBackend = null
+    for (def be : backends) {
+        def alive = be.Alive.toBoolean()
+        def decommissioned = be.SystemDecommissioned.toBoolean()
+        if (alive && !decommissioned) {
+            replicaNum++
+            targetBackend = be
+        }
+    }
+    assertTrue(replicaNum > 0)
+    
+    def check_data_correct = { def tableName ->
+        def old_result = sql "select id from ${tableName} order by id;"
+        logger.info("first result: " + old_result)
+        for (int i = 1; i<30; ++i){
+            def new_result = sql "select id from ${tableName} order by id;"
+            logger.info("new result: " + new_result)
+            for (int j = 0; j<old_result.size();++j){
+                if (old_result[j][0]!=new_result[j][0]){
+                    logger.info("table name: " + tableName)
+                    logger.info("old result: " + old_result)
+                    logger.info("new result: " + new_result)
+                    assertTrue(false)
+                }
+            }
+            old_result = new_result
+        }
+    }
+    // test for partial update, auto inc col is key
+    def table1 = "unique_auto_inc_col_key_partial_update_stream_load"
+    sql "drop table if exists ${table1}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table1}` (
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    streamLoad {
+        table "${table1}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+
+        file 'auto_inc_basic.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    qt_select1_1 "select * from ${table1} order by id;"
+
+    // 1, 123
+    // 3, 323
+    // 5, 523
+    // 7, 723
+    // 9, 923
+    streamLoad {
+        table "${table1}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'id, value'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update1.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select1_1 "select name, value from ${table1} order by name, value;"
+    qt_select1_2 "select id, count(*) from ${table1} group by id having 
count(*) > 1;"
+    check_data_correct(table1)
+    sql "drop table if exists ${table1};"
+
+    // test for partial update, auto inc col is value, update auto inc col
+    def table2 = "unique_auto_inc_col_value_partial_update_stream_load"
+    sql "drop table if exists ${table2}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table2}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    streamLoad {
+        table "${table2}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+
+        file 'auto_inc_basic.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    qt_select2_1 "select name, value from ${table2} order by name, value;"
+    qt_select2_2 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    streamLoad {
+        table "${table2}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, id'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update2.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select2_3 "select name, value from ${table2} order by name, value;"
+    qt_select2_4 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+    sql "drop table if exists ${table2};"
+
+    // test for partial update, auto inc col is value, update other col
+    def table3 = "unique_auto_inc_col_value_partial_update_stream_load"
+    sql "drop table if exists ${table3}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table3}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    streamLoad {
+        table "${table3}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+
+        file 'auto_inc_basic.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    qt_select3_1 "select name, value from ${table3} order by name, value;"
+    qt_select3_2 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    streamLoad {
+        table "${table3}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update2.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select3_3 "select name, value from ${table3} order by name, value;"
+    qt_select3_4 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+    // BBob, 9990
+    // TTom, 9992
+    // CCarter, 9994
+    // BBeata, 9996
+    // NNereids, 9998
+    streamLoad {
+        table "${table3}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update3.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select3_5 "select name, value from ${table3} order by name, value;"
+    qt_select3_6 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+    sql "drop table if exists ${table3};"
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to