This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new db88a5937d8 [Fix](auto inc) Fix multiple replica partial update auto 
inc data inconsistency problem (#34788)
db88a5937d8 is described below

commit db88a5937d8649d452e2e84ae73ee560506d32eb
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Wed May 15 17:19:54 2024 +0800

    [Fix](auto inc) Fix multiple replica partial update auto inc data 
inconsistency problem (#34788)
    
    * **Problem:** For tables with auto-increment columns, updating partial 
columns can cause data inconsistency among replicas.
    
    **Cause:** Previously, the implementation for updating partial columns in 
tables with auto-increment columns was done independently on each BE (Backend), 
leading to potential inconsistencies in the auto-increment column values 
generated by each BE.
    
    **Solution:** Before distributing blocks, determine if the update involves 
partial columns of a table with an auto-increment column. If so, add the 
auto-increment column to the last column of the block. After distributing to 
each BE, each BE will check if the data key for the partial column update 
exists. If it exists, the previous auto-increment column value is used; if not, 
the auto-increment column value from the last column of the block is used. This 
ensures that the auto-incremen [...]
    
    * 2
---
 be/src/exec/tablet_info.cpp                        |   3 +
 be/src/exec/tablet_info.h                          |   2 +
 be/src/olap/delta_writer_v2.cpp                    |   3 +-
 be/src/olap/memtable.cpp                           |  13 +-
 be/src/olap/memtable.h                             |   2 +
 be/src/olap/partial_update_info.h                  |  10 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  30 +--
 be/src/olap/rowset/segment_v2/segment_writer.h     |   7 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  27 +--
 .../rowset/segment_v2/vertical_segment_writer.h    |   6 +-
 be/src/olap/rowset_builder.cpp                     |   3 +-
 be/src/runtime/runtime_state.h                     |   7 +
 be/src/vec/sink/vtablet_block_convertor.cpp        |  49 +++-
 be/src/vec/sink/vtablet_block_convertor.h          |   7 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   6 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   6 +-
 .../org/apache/doris/planner/OlapTableSink.java    |   1 +
 gensrc/proto/descriptors.proto                     |   1 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 ..._auto_inc_partial_update_consistency_insert.out |  97 ++++++++
 ..._inc_partial_update_consistency_stream_load.out |  95 ++++++++
 ...to_inc_partial_update_consistency_insert.groovy | 221 ++++++++++++++++++
 ...c_partial_update_consistency_stream_load.groovy | 255 +++++++++++++++++++++
 23 files changed, 788 insertions(+), 64 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index ff8c272fb22..62ff0b2fcce 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -121,6 +121,7 @@ Status OlapTableSchemaParam::init(const 
POlapTableSchemaParam& pschema) {
     _is_strict_mode = pschema.is_strict_mode();
     if (_is_partial_update) {
         _auto_increment_column = pschema.auto_increment_column();
+        _auto_increment_column_unique_id = 
pschema.auto_increment_column_unique_id();
     }
     _timestamp_ms = pschema.timestamp_ms();
     _timezone = pschema.timezone();
@@ -186,6 +187,7 @@ Status OlapTableSchemaParam::init(const 
TOlapTableSchemaParam& tschema) {
     }
     if (_is_partial_update) {
         _auto_increment_column = tschema.auto_increment_column;
+        _auto_increment_column_unique_id = 
tschema.auto_increment_column_unique_id;
     }
 
     for (const auto& tcolumn : tschema.partial_update_input_columns) {
@@ -258,6 +260,7 @@ void 
OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
     pschema->set_partial_update(_is_partial_update);
     pschema->set_is_strict_mode(_is_strict_mode);
     pschema->set_auto_increment_column(_auto_increment_column);
+    
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
     pschema->set_timestamp_ms(_timestamp_ms);
     pschema->set_timezone(_timezone);
     for (auto col : _partial_update_input_columns) {
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 20f4fa51fc6..fcba8fd8262 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -93,6 +93,7 @@ public:
         return _partial_update_input_columns;
     }
     std::string auto_increment_coulumn() const { return 
_auto_increment_column; }
+    int32_t auto_increment_column_unique_id() const { return 
_auto_increment_column_unique_id; }
     void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = 
timestamp_ms; }
     int64_t timestamp_ms() const { return _timestamp_ms; }
     void set_timezone(std::string timezone) { _timezone = timezone; }
@@ -113,6 +114,7 @@ private:
     std::set<std::string> _partial_update_input_columns;
     bool _is_strict_mode = false;
     std::string _auto_increment_column;
+    int32_t _auto_increment_column_unique_id;
     int64_t _timestamp_ms = 0;
     std::string _timezone;
 };
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 164ee894e32..34c03fee95e 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -238,7 +238,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t 
index_id,
     _partial_update_info->init(*_tablet_schema, 
table_schema_param->is_partial_update(),
                                
table_schema_param->partial_update_input_columns(),
                                table_schema_param->is_strict_mode(),
-                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone());
+                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone(),
+                               table_schema_param->auto_increment_coulumn());
 }
 
 } // namespace doris
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 3ee093c2b37..d26036252b4 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -68,16 +68,22 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* 
tablet_schema,
     _query_thread_context.init_unlocked();
     _arena = std::make_unique<vectorized::Arena>();
     _vec_row_comparator = 
std::make_shared<RowInBlockComparator>(_tablet_schema);
-    // TODO: Support ZOrderComparator in the future
-    _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     _num_columns = _tablet_schema->num_columns();
     if (partial_update_info != nullptr) {
         _is_partial_update = partial_update_info->is_partial_update;
         if (_is_partial_update) {
             _num_columns = 
partial_update_info->partial_update_input_columns.size();
+            if (partial_update_info->is_schema_contains_auto_inc_column &&
+                
!partial_update_info->is_input_columns_contains_auto_inc_column) {
+                _is_partial_update_and_auto_inc = true;
+                _num_columns += 1;
+            }
         }
     }
+    // TODO: Support ZOrderComparator in the future
+    _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
 }
+
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
                                                   const TupleDescriptor* 
tuple_desc) {
     for (auto slot_desc : *slot_descs) {
@@ -89,6 +95,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescript
             }
         }
     }
+    if (_is_partial_update_and_auto_inc) {
+        _column_offset.emplace_back(_column_offset.size());
+    }
 }
 
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index c95e38fb05a..8362c69222e 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -265,6 +265,8 @@ private:
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;
+
+    bool _is_partial_update_and_auto_inc = false;
 }; // class MemTable
 
 } // namespace doris
diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index 7a22ecf5035..e08021b4f38 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -24,7 +24,8 @@ namespace doris {
 struct PartialUpdateInfo {
     void init(const TabletSchema& tablet_schema, bool partial_update,
               const std::set<string>& partial_update_cols, bool is_strict_mode,
-              int64_t timestamp_ms, const std::string& timezone) {
+              int64_t timestamp_ms, const std::string& timezone,
+              const std::string& auto_increment_column) {
         is_partial_update = partial_update;
         partial_update_input_columns = partial_update_cols;
         this->timestamp_ms = timestamp_ms;
@@ -42,8 +43,13 @@ struct PartialUpdateInfo {
             } else {
                 update_cids.emplace_back(i);
             }
+            if (auto_increment_column == tablet_column.name()) {
+                is_schema_contains_auto_inc_column = true;
+            }
         }
         this->is_strict_mode = is_strict_mode;
+        is_input_columns_contains_auto_inc_column =
+                is_partial_update && 
partial_update_input_columns.contains(auto_increment_column);
     }
 
     bool is_partial_update {false};
@@ -56,5 +62,7 @@ struct PartialUpdateInfo {
     bool is_strict_mode {false};
     int64_t timestamp_ms {0};
     std::string timezone;
+    bool is_input_columns_contains_auto_inc_column = false;
+    bool is_schema_contains_auto_inc_column = false;
 };
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 939c504580f..a4c2d988ab9 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -117,13 +117,6 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
             const auto& column = 
_tablet_schema->column(_tablet_schema->sequence_col_idx());
             _seq_coder = get_key_coder(column.type());
         }
-        if (!_tablet_schema->auto_increment_column().empty()) {
-            _auto_inc_id_buffer =
-                    
vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
-                            _tablet_schema->db_id(), 
_tablet_schema->table_id(),
-                            
_tablet_schema->column(_tablet_schema->auto_increment_column())
-                                    .unique_id());
-        }
         // encode the rowid into the primary key index
         if (!_tablet_schema->cluster_key_idxes().empty()) {
             const auto* type_info = 
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
@@ -559,7 +552,7 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     // read and fill block
     auto mutable_full_columns = full_block.mutate_columns();
     RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, 
use_default_or_null_flag,
-                                         has_default_or_nullable, 
segment_start_pos));
+                                         has_default_or_nullable, 
segment_start_pos, block));
     full_block.set_columns(std::move(mutable_full_columns));
     // row column should be filled here
     if (_tablet_schema->store_row_column()) {
@@ -618,7 +611,8 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
 Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                            const std::vector<bool>& 
use_default_or_null_flag,
                                            bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
+                                           const size_t& segment_start_pos,
+                                           const vectorized::Block* block) {
     if (config::is_cloud_mode()) {
         // TODO(plat1ko): cloud mode
         return Status::NotSupported("fill_missing_columns");
@@ -712,18 +706,6 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
         }
     }
 
-    // deal with partial update auto increment column when there no key in old 
block.
-    if (!_tablet_schema->auto_increment_column().empty()) {
-        if (_auto_inc_id_allocator.total_count < 
use_default_or_null_flag.size()) {
-            std::vector<std::pair<int64_t, size_t>> res;
-            RETURN_IF_ERROR(
-                    
_auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res));
-            for (auto [start, length] : res) {
-                _auto_inc_id_allocator.insert_ids(start, length);
-            }
-        }
-    }
-
     // fill all missing value from mutable_old_columns, need to consider 
default value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
         // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
@@ -751,7 +733,11 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
                            FieldType::OLAP_FIELD_TYPE_BIGINT);
                     auto auto_inc_column = 
assert_cast<vectorized::ColumnInt64*>(
                             mutable_full_columns[cids_missing[i]].get());
-                    auto_inc_column->insert(_auto_inc_id_allocator.next_id());
+                    auto_inc_column->insert(
+                            (assert_cast<const vectorized::ColumnInt64*>(
+                                     
block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__")
+                                             .column.get()))
+                                    ->get_element(idx));
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index d1d38f7773f..2b04f36ac2d 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -41,7 +41,6 @@
 #include "olap/tablet_schema.h"
 #include "util/faststring.h"
 #include "util/slice.h"
-#include "vec/sink/autoinc_buffer.h"
 
 namespace doris {
 namespace vectorized {
@@ -132,7 +131,8 @@ public:
     void set_mow_context(std::shared_ptr<MowContext> mow_context);
     Status fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                 const std::vector<bool>& 
use_default_or_null_flag,
-                                bool has_default_or_nullable, const size_t& 
segment_start_pos);
+                                bool has_default_or_nullable, const size_t& 
segment_start_pos,
+                                const vectorized::Block* block);
 
 private:
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
@@ -228,9 +228,6 @@ private:
     // group every rowset-segment row id to speed up reader
     PartialUpdateReadPlan _rssid_to_rid;
     std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
-
-    std::shared_ptr<vectorized::AutoIncIDBuffer> _auto_inc_id_buffer = nullptr;
-    vectorized::AutoIncIDAllocator _auto_inc_id_allocator;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 7466f7861c5..448a2b7304c 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -107,11 +107,6 @@ 
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32
         const auto& column = 
_tablet_schema->column(_tablet_schema->sequence_col_idx());
         _seq_coder = get_key_coder(column.type());
     }
-    if (!_tablet_schema->auto_increment_column().empty()) {
-        _auto_inc_id_buffer = 
vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
-                _tablet_schema->db_id(), _tablet_schema->table_id(),
-                
_tablet_schema->column(_tablet_schema->auto_increment_column()).unique_id());
-    }
     if (_tablet_schema->has_inverted_index()) {
         _inverted_index_file_writer = 
std::make_unique<InvertedIndexFileWriter>(
                 fs ? fs : io::global_local_filesystem(), 
_file_writer->path().parent_path(),
@@ -492,7 +487,7 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
     // read and fill block
     auto mutable_full_columns = full_block.mutate_columns();
     RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, 
use_default_or_null_flag,
-                                          has_default_or_nullable, 
segment_start_pos));
+                                          has_default_or_nullable, 
segment_start_pos, data.block));
     // row column should be filled here
     if (_tablet_schema->store_row_column()) {
         // convert block to row store format
@@ -551,7 +546,7 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
 Status VerticalSegmentWriter::_fill_missing_columns(
         vectorized::MutableColumns& mutable_full_columns,
         const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
-        const size_t& segment_start_pos) {
+        const size_t& segment_start_pos, const vectorized::Block* block) {
     auto tablet = static_cast<Tablet*>(_tablet.get());
     // create old value columns
     const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
@@ -640,18 +635,6 @@ Status VerticalSegmentWriter::_fill_missing_columns(
         }
     }
 
-    // deal with partial update auto increment column when there no key in old 
block.
-    if (!_tablet_schema->auto_increment_column().empty()) {
-        if (_auto_inc_id_allocator.total_count < 
use_default_or_null_flag.size()) {
-            std::vector<std::pair<int64_t, size_t>> res;
-            RETURN_IF_ERROR(
-                    
_auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res));
-            for (auto [start, length] : res) {
-                _auto_inc_id_allocator.insert_ids(start, length);
-            }
-        }
-    }
-
     // fill all missing value from mutable_old_columns, need to consider 
default value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
         // `use_default_or_null_flag[idx] == true` doesn't mean that we should 
read values from the old row
@@ -679,7 +662,11 @@ Status VerticalSegmentWriter::_fill_missing_columns(
                            FieldType::OLAP_FIELD_TYPE_BIGINT);
                     auto auto_inc_column = 
assert_cast<vectorized::ColumnInt64*>(
                             mutable_full_columns[missing_cids[i]].get());
-                    auto_inc_column->insert(_auto_inc_id_allocator.next_id());
+                    auto_inc_column->insert(
+                            (assert_cast<const vectorized::ColumnInt64*>(
+                                     
block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__")
+                                             .column.get()))
+                                    ->get_element(idx));
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index ac328063b35..6c853aa185f 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -38,7 +38,6 @@
 #include "olap/tablet_schema.h"
 #include "util/faststring.h"
 #include "util/slice.h"
-#include "vec/sink/autoinc_buffer.h"
 
 namespace doris {
 namespace vectorized {
@@ -147,7 +146,8 @@ private:
     Status _append_block_with_partial_content(RowsInBlock& data);
     Status _fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
                                  const std::vector<bool>& 
use_default_or_null_flag,
-                                 bool has_default_or_nullable, const size_t& 
segment_start_pos);
+                                 bool has_default_or_nullable, const size_t& 
segment_start_pos,
+                                 const vectorized::Block* block);
 
 private:
     uint32_t _segment_id;
@@ -195,8 +195,6 @@ private:
     std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
 
     std::vector<RowsInBlock> _batched_blocks;
-    std::shared_ptr<vectorized::AutoIncIDBuffer> _auto_inc_id_buffer = nullptr;
-    vectorized::AutoIncIDAllocator _auto_inc_id_allocator;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 9d8b8163b71..7fdaa41cc74 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -383,7 +383,8 @@ void 
BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
     _partial_update_info->init(*_tablet_schema, 
table_schema_param->is_partial_update(),
                                
table_schema_param->partial_update_input_columns(),
                                table_schema_param->is_strict_mode(),
-                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone());
+                               table_schema_param->timestamp_ms(), 
table_schema_param->timezone(),
+                               table_schema_param->auto_increment_coulumn());
 }
 
 } // namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 3f06df6c0b9..f2e2c887571 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -43,6 +43,7 @@
 #include "runtime/task_execution_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
+#include "vec/columns/columns_number.h"
 
 namespace doris {
 class IRuntimeFilter;
@@ -628,6 +629,10 @@ public:
 
     int task_num() const { return _task_num; }
 
+    vectorized::ColumnInt64* partial_update_auto_inc_column() {
+        return _partial_update_auto_inc_column;
+    };
+
 private:
     Status create_error_log_file();
 
@@ -757,6 +762,8 @@ private:
 
     // prohibit copies
     RuntimeState(const RuntimeState&);
+
+    vectorized::ColumnInt64* _partial_update_auto_inc_column;
 };
 
 #define RETURN_IF_CANCELLED(state)                                             
       \
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
index 678c899d980..d93a654728d 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -19,6 +19,7 @@
 
 #include <fmt/format.h>
 #include <gen_cpp/FrontendService.h>
+#include <glog/logging.h>
 #include <google/protobuf/stubs/common.h>
 
 #include <algorithm>
@@ -46,6 +47,7 @@
 #include "vec/common/assert_cast.h"
 #include "vec/core/block.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr.h"
@@ -66,8 +68,21 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
                 output_vexpr_ctxs, *input_block, block.get()));
     }
 
-    // fill the valus for auto-increment columns
-    if (_auto_inc_col_idx.has_value()) {
+    if (_is_partial_update_and_auto_inc) {
+        // If this load is partial update and this table has a auto inc column,
+        // e.g. table schema: k1, v1, v2(auto inc)
+        // 1. insert columns include auto inc column
+        // e.g. insert into table (k1, v2) value(a, 1);
+        // we do nothing.
+        // 2. insert columns do not include auto inc column
+        // e.g. insert into table (k1, v1) value(a, a);
+        // we need to fill auto_inc_cols by creating a new column.
+        if (!_auto_inc_col_idx.has_value()) {
+            RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), 
rows));
+        }
+    } else if (_auto_inc_col_idx.has_value()) {
+        // fill the valus for auto-increment columns
+        DCHECK_EQ(_is_partial_update_and_auto_inc, false);
         RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows));
     }
 
@@ -91,8 +106,16 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
     return Status::OK();
 }
 
-void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t 
table_id, int batch_size) {
+void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t 
table_id, int batch_size,
+                                                bool 
is_partial_update_and_auto_inc,
+                                                int32_t 
auto_increment_column_unique_id) {
     _batch_size = batch_size;
+    if (is_partial_update_and_auto_inc) {
+        _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc;
+        _auto_inc_id_buffer = 
GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
+                db_id, table_id, auto_increment_column_unique_id);
+        return;
+    }
     for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) {
         if (_output_tuple_desc->slots()[idx]->is_auto_increment()) {
             _auto_inc_col_idx = idx;
@@ -522,4 +545,24 @@ Status 
OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si
     return Status::OK();
 }
 
+Status 
OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* 
block,
+                                                                   size_t 
rows) {
+    auto dst_column = vectorized::ColumnInt64::create();
+    vectorized::ColumnInt64::Container& dst_values = dst_column->get_data();
+    size_t null_value_count = rows;
+    std::vector<std::pair<int64_t, size_t>> res;
+    RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, 
&res));
+    for (auto [start, length] : res) {
+        _auto_inc_id_allocator.insert_ids(start, length);
+    }
+
+    for (size_t i = 0; i < rows; i++) {
+        dst_values.emplace_back(_auto_inc_id_allocator.next_id());
+    }
+    block->insert(vectorized::ColumnWithTypeAndName(std::move(dst_column),
+                                                    
std::make_shared<DataTypeNumber<Int64>>(),
+                                                    
"__PARTIAL_UPDATE_AUTO_INC_COLUMN__"));
+    return Status::OK();
+}
+
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vtablet_block_convertor.h 
b/be/src/vec/sink/vtablet_block_convertor.h
index 4eaaef3869c..0db340ce6c2 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -53,7 +53,9 @@ public:
 
     int64_t num_filtered_rows() const { return _num_filtered_rows; }
 
-    void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size);
+    void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size,
+                           bool is_partial_update_and_auto_inc = false,
+                           int32_t auto_increment_column_unique_id = -1);
 
     AutoIncIDAllocator& auto_inc_id_allocator() { return 
_auto_inc_id_allocator; }
 
@@ -82,6 +84,8 @@ private:
 
     Status _fill_auto_inc_cols(vectorized::Block* block, size_t rows);
 
+    Status _partial_update_fill_auto_inc_cols(vectorized::Block* block, size_t 
rows);
+
     TupleDescriptor* _output_tuple_desc = nullptr;
 
     std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val;
@@ -105,6 +109,7 @@ private:
     std::optional<size_t> _auto_inc_col_idx;
     std::shared_ptr<AutoIncIDBuffer> _auto_inc_id_buffer = nullptr;
     AutoIncIDAllocator _auto_inc_id_allocator;
+    bool _is_partial_update_and_auto_inc = false;
 };
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 3d97f71d7df..b858243fd91 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1186,8 +1186,10 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     }
 
     _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
-    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
-                                        _state->batch_size());
+    _block_convertor->init_autoinc_info(
+            _schema->db_id(), _schema->table_id(), _state->batch_size(),
+            _schema->is_partial_update() && 
!_schema->auto_increment_coulumn().empty(),
+            _schema->auto_increment_column_unique_id());
     _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
 
     // add all counter
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 58639c152f7..5e8b57ee029 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -214,8 +214,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     }
 
     _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
-    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
-                                        _state->batch_size());
+    _block_convertor->init_autoinc_info(
+            _schema->db_id(), _schema->table_id(), _state->batch_size(),
+            _schema->is_partial_update() && 
!_schema->auto_increment_coulumn().empty(),
+            _schema->auto_increment_column_unique_id());
     _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
 
     // add all counter
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index f723e9dc15b..7cc316c350f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -322,6 +322,7 @@ public class OlapTableSink extends DataSink {
             for (Column col : table.getFullSchema()) {
                 if (col.isAutoInc()) {
                     schemaParam.setAutoIncrementColumn(col.getName());
+                    
schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId());
                 }
             }
         }
diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto
index 9d6945becc0..13c069f414f 100644
--- a/gensrc/proto/descriptors.proto
+++ b/gensrc/proto/descriptors.proto
@@ -72,5 +72,6 @@ message POlapTableSchemaParam {
     optional string auto_increment_column = 10;
     optional int64 timestamp_ms = 11 [default = 0];
     optional string timezone = 12;
+    optional int32 auto_increment_column_unique_id = 13;
 };
 
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 7f1e70d1b20..2d8390e2667 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -245,6 +245,7 @@ struct TOlapTableSchemaParam {
     9: optional list<string> partial_update_input_columns
     10: optional bool is_strict_mode = false
     11: optional string auto_increment_column
+    12: optional i32 auto_increment_column_unique_id = -1
 }
 
 struct TTabletLocation {
diff --git 
a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out
 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out
new file mode 100644
index 00000000000..57d16693a92
--- /dev/null
+++ 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out
@@ -0,0 +1,97 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1_1 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select1_2 --
+
+-- !select1_3 --
+Bob    123
+Alice  200
+Tom    323
+Test   400
+Carter 523
+Smith  600
+Beata  723
+Doris  800
+Nereids        923
+
+-- !select1_4 --
+
+-- !select2_1 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select2_2 --
+
+-- !select2_3 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select2_4 --
+
+-- !select3_1 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select3_2 --
+
+-- !select3_3 --
+Alice  200
+Test   400
+Smith  600
+Doris  800
+Bob    9990
+Tom    9992
+Carter 9994
+Beata  9996
+Nereids        9998
+
+-- !select3_4 --
+
+-- !select3_5 --
+Alice  200
+Test   400
+Smith  600
+Doris  800
+Bob    9990
+BBBBob 9990
+Tom    9992
+TTTTom 9992
+Carter 9994
+CCCCarter      9994
+BBBBeata       9996
+Beata  9996
+NNNNereids     9998
+Nereids        9998
+
+-- !select3_6 --
+
diff --git 
a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out
 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out
new file mode 100644
index 00000000000..b48b8ab73c8
--- /dev/null
+++ 
b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out
@@ -0,0 +1,95 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1_1 --
+1      Bob     100
+2      Alice   200
+3      Tom     300
+4      Test    400
+5      Carter  500
+6      Smith   600
+7      Beata   700
+8      Doris   800
+9      Nereids 900
+
+-- !select1_1 --
+Bob    123
+Alice  200
+Tom    323
+Test   400
+Carter 523
+Smith  600
+Beata  723
+Doris  800
+Nereids        923
+
+-- !select1_2 --
+
+-- !select2_1 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select2_2 --
+
+-- !select2_3 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select2_4 --
+
+-- !select3_1 --
+Bob    100
+Alice  200
+Tom    300
+Test   400
+Carter 500
+Smith  600
+Beata  700
+Doris  800
+Nereids        900
+
+-- !select3_2 --
+
+-- !select3_3 --
+Alice  200
+Test   400
+Smith  600
+Doris  800
+Bob    9990
+Tom    9992
+Carter 9994
+Beata  9996
+Nereids        9998
+
+-- !select3_4 --
+
+-- !select3_5 --
+Alice  200
+Test   400
+Smith  600
+Doris  800
+Bob    9990
+BBob   9990
+TTom   9992
+Tom    9992
+CCarter        9994
+Carter 9994
+BBeata 9996
+Beata  9996
+Nereids        9998
+NNereids       9998
+
+-- !select3_6 --
+
diff --git 
a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy
 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy
new file mode 100644
index 00000000000..e0e86c7c6b6
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_unique_table_auto_inc_partial_update_correct_insert") {
+
+    def backends = sql_return_maparray('show backends')
+    def replicaNum = 0
+    def targetBackend = null
+    for (def be : backends) {
+        def alive = be.Alive.toBoolean()
+        def decommissioned = be.SystemDecommissioned.toBoolean()
+        if (alive && !decommissioned) {
+            replicaNum++
+            targetBackend = be
+        }
+    }
+    assertTrue(replicaNum > 0)
+
+    def check_data_correct = { def tableName ->
+        def old_result = sql "select id from ${tableName} order by id;"
+        logger.info("first result: " + old_result)
+        for (int i = 1; i<30; ++i){
+            def new_result = sql "select id from ${tableName} order by id;"
+            logger.info("new result: " + new_result)
+            for (int j = 0; j<old_result.size();++j){
+                if (old_result[j][0]!=new_result[j][0]){
+                    logger.info("table name: " + tableName)
+                    logger.info("old result: " + old_result)
+                    logger.info("new result: " + new_result)
+                    assertTrue(false)
+                }
+            }
+            old_result = new_result
+        }
+    }
+    
+    // test for partial update, auto inc col is key
+    def table1 = "unique_auto_inc_col_key_partial_update_insert"
+    sql "drop table if exists ${table1}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table1}` (
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    sql "insert into ${table1} (name, value) values 
('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)"
+
+    qt_select1_1 "select name, value from ${table1} order by value;"
+    qt_select1_2 "select id, count(*) from ${table1} group by id having 
count(*) > 1;"
+    check_data_correct(table1)
+    
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+
+    // 1, 123
+    // 3, 323
+    // 5, 523
+    // 7, 723
+    // 9, 923
+    sql "insert into ${table1} (id, value) values 
(1,123),(3,323),(5,523),(7,723),(9,923)"
+    qt_select1_3 "select name, value from ${table1} order by value;"
+    qt_select1_4 "select id, count(*) from ${table1} group by id having 
count(*) > 1;"
+    check_data_correct(table1)
+    sql "drop table if exists ${table1};"
+
+    // test for partial update, auto inc col is value, update auto inc col
+    def table2 = "unique_auto_inc_col_value_partial_update_insert"
+    sql "drop table if exists ${table2}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table2}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    sql "insert into ${table2} (name, value) values ('Bob',100)"
+    sql "insert into ${table2} (name, value) values ('Alice',200)"
+    sql "insert into ${table2} (name, value) values ('Tom',300)"
+    sql "insert into ${table2} (name, value) values ('Test',400)"
+    sql "insert into ${table2} (name, value) values ('Carter',500)"
+    sql "insert into ${table2} (name, value) values ('Smith',600)"
+    sql "insert into ${table2} (name, value) values ('Beata',700)"
+    sql "insert into ${table2} (name, value) values ('Doris',800)"
+    sql "insert into ${table2} (name, value) values ('Nereids',900)"
+    qt_select2_1 "select name, value from ${table2} order by value;"
+    qt_select2_2 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    sql "insert into ${table2} (name, id) values 
('Bob',9990),('Tom',9992),('Carter',9994),('Beata',9996),('Nereids',9998)"
+    qt_select2_3 "select name, value from ${table2} order by value;"
+    qt_select2_4 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+    sql "drop table if exists ${table2};"
+
+    // test for partial update, auto inc col is value, update other col
+    def table3 = "unique_auto_inc_col_value_partial_update_insert"
+    sql "drop table if exists ${table3}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table3}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    sql "set enable_unique_key_partial_update=false;"
+    sql "set enable_insert_strict=true;"
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    sql "insert into ${table2} (name, value) values 
('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)"
+    qt_select3_1 "select name, value from ${table3} order by value;"
+    qt_select3_2 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+
+    sql "set enable_unique_key_partial_update=true;"
+    sql "set enable_insert_strict=false;"
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    sql "insert into ${table2} (name, value) values ('Bob',9990)"
+    sql "insert into ${table2} (name, value) values ('Tom',9992)"
+    sql "insert into ${table2} (name, value) values ('Carter',9994)"
+    sql "insert into ${table2} (name, value) values ('Beata',9996)"
+    sql "insert into ${table2} (name, value) values ('Nereids',9998)"
+    qt_select3_3 "select name, value from ${table3} order by value;"
+    qt_select3_4 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+
+    sql "insert into ${table2} (name, value) values ('BBBBob',9990)"
+    sql "insert into ${table2} (name, value) values ('TTTTom',9992)"
+    sql "insert into ${table2} (name, value) values ('CCCCarter',9994)"
+    sql "insert into ${table2} (name, value) values ('BBBBeata',9996)"
+    sql "insert into ${table2} (name, value) values ('NNNNereids',9998)"
+    qt_select3_5 "select name, value from ${table3} order by value;"
+    qt_select3_6 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+    sql "drop table if exists ${table3};"
+}
+
diff --git 
a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy
 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy
new file mode 100644
index 00000000000..70a84ee971d
--- /dev/null
+++ 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_unique_table_auto_inc_partial_update_correct_stream_load") {
+
+    def backends = sql_return_maparray('show backends')
+    def replicaNum = 0
+    def targetBackend = null
+    for (def be : backends) {
+        def alive = be.Alive.toBoolean()
+        def decommissioned = be.SystemDecommissioned.toBoolean()
+        if (alive && !decommissioned) {
+            replicaNum++
+            targetBackend = be
+        }
+    }
+    assertTrue(replicaNum > 0)
+    
+    def check_data_correct = { def tableName ->
+        def old_result = sql "select id from ${tableName} order by id;"
+        logger.info("first result: " + old_result)
+        for (int i = 1; i<30; ++i){
+            def new_result = sql "select id from ${tableName} order by id;"
+            logger.info("new result: " + new_result)
+            for (int j = 0; j<old_result.size();++j){
+                if (old_result[j][0]!=new_result[j][0]){
+                    logger.info("table name: " + tableName)
+                    logger.info("old result: " + old_result)
+                    logger.info("new result: " + new_result)
+                    assertTrue(false)
+                }
+            }
+            old_result = new_result
+        }
+    }
+    // test for partial update, auto inc col is key
+    def table1 = "unique_auto_inc_col_key_partial_update_stream_load"
+    sql "drop table if exists ${table1}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table1}` (
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    streamLoad {
+        table "${table1}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+
+        file 'auto_inc_basic.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    qt_select1_1 "select * from ${table1} order by id;"
+
+    // 1, 123
+    // 3, 323
+    // 5, 523
+    // 7, 723
+    // 9, 923
+    streamLoad {
+        table "${table1}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'id, value'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update1.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select1_1 "select name, value from ${table1} order by value;"
+    qt_select1_2 "select id, count(*) from ${table1} group by id having 
count(*) > 1;"
+    check_data_correct(table1)
+    sql "drop table if exists ${table1};"
+
+    // test for partial update, auto inc col is value, update auto inc col
+    def table2 = "unique_auto_inc_col_value_partial_update_stream_load"
+    sql "drop table if exists ${table2}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table2}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+
+    // Bob, 100
+    // Alice, 200
+    // Tom, 300
+    // Test, 400
+    // Carter, 500
+    // Smith, 600
+    // Beata, 700
+    // Doris, 800
+    // Nereids, 900
+    streamLoad {
+        table "${table2}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+
+        file 'auto_inc_basic.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    qt_select2_1 "select name, value from ${table2} order by value;"
+    qt_select2_2 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    streamLoad {
+        table "${table2}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, id'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update2.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select2_3 "select name, value from ${table2} order by value;"
+    qt_select2_4 "select id, count(*) from ${table2} group by id having 
count(*) > 1;"
+    check_data_correct(table2)
+    sql "drop table if exists ${table2};"
+
+    // test for partial update, auto inc col is value, update other col
+    def table3 = "unique_auto_inc_col_value_partial_update_stream_load"
+    sql "drop table if exists ${table3}"
+    sql """
+        CREATE TABLE IF NOT EXISTS `${table3}` (
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `value` int(11) NOT NULL COMMENT "用户得分",
+          `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`name`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`name`) BUCKETS 3
+        PROPERTIES (
+        "replication_num" = "${replicaNum}",
+        "in_memory" = "false",
+        "storage_format" = "V2",
+        "enable_unique_key_merge_on_write" = "true"
+        )
+    """
+    streamLoad {
+        table "${table3}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+
+        file 'auto_inc_basic.csv'
+        time 10000 // limit inflight 10s
+    }
+    sql "sync"
+    qt_select3_1 "select name, value from ${table3} order by value;"
+    qt_select3_2 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+
+    // Bob, 9990
+    // Tom, 9992
+    // Carter, 9994
+    // Beata, 9996
+    // Nereids, 9998
+    streamLoad {
+        table "${table3}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update2.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select3_3 "select name, value from ${table3} order by value;"
+    qt_select3_4 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+    // BBob, 9990
+    // TTom, 9992
+    // CCarter, 9994
+    // BBeata, 9996
+    // NNereids, 9998
+    streamLoad {
+        table "${table3}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', 'name, value'
+        set 'partial_columns', 'true'
+
+        file 'auto_inc_partial_update3.csv'
+        time 10000
+    }
+    sql "sync"
+    qt_select3_5 "select name, value from ${table3} order by value;"
+    qt_select3_6 "select id, count(*) from ${table3} group by id having 
count(*) > 1;"
+    check_data_correct(table3)
+    sql "drop table if exists ${table3};"
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to