This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new db88a5937d8 [Fix](auto inc) Fix multiple replica partial update auto inc data inconsistency problem (#34788) db88a5937d8 is described below commit db88a5937d8649d452e2e84ae73ee560506d32eb Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Wed May 15 17:19:54 2024 +0800 [Fix](auto inc) Fix multiple replica partial update auto inc data inconsistency problem (#34788) * **Problem:** For tables with auto-increment columns, updating partial columns can cause data inconsistency among replicas. **Cause:** Previously, the implementation for updating partial columns in tables with auto-increment columns was done independently on each BE (Backend), leading to potential inconsistencies in the auto-increment column values generated by each BE. **Solution:** Before distributing blocks, determine if the update involves partial columns of a table with an auto-increment column. If so, add the auto-increment column to the last column of the block. After distributing to each BE, each BE will check if the data key for the partial column update exists. If it exists, the previous auto-increment column value is used; if not, the auto-increment column value from the last column of the block is used. This ensures that the auto-incremen [...] * 2 --- be/src/exec/tablet_info.cpp | 3 + be/src/exec/tablet_info.h | 2 + be/src/olap/delta_writer_v2.cpp | 3 +- be/src/olap/memtable.cpp | 13 +- be/src/olap/memtable.h | 2 + be/src/olap/partial_update_info.h | 10 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 30 +-- be/src/olap/rowset/segment_v2/segment_writer.h | 7 +- .../rowset/segment_v2/vertical_segment_writer.cpp | 27 +-- .../rowset/segment_v2/vertical_segment_writer.h | 6 +- be/src/olap/rowset_builder.cpp | 3 +- be/src/runtime/runtime_state.h | 7 + be/src/vec/sink/vtablet_block_convertor.cpp | 49 +++- be/src/vec/sink/vtablet_block_convertor.h | 7 +- be/src/vec/sink/writer/vtablet_writer.cpp | 6 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 +- .../org/apache/doris/planner/OlapTableSink.java | 1 + gensrc/proto/descriptors.proto | 1 + gensrc/thrift/Descriptors.thrift | 1 + ..._auto_inc_partial_update_consistency_insert.out | 97 ++++++++ ..._inc_partial_update_consistency_stream_load.out | 95 ++++++++ ...to_inc_partial_update_consistency_insert.groovy | 221 ++++++++++++++++++ ...c_partial_update_consistency_stream_load.groovy | 255 +++++++++++++++++++++ 23 files changed, 788 insertions(+), 64 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index ff8c272fb22..62ff0b2fcce 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -121,6 +121,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _is_strict_mode = pschema.is_strict_mode(); if (_is_partial_update) { _auto_increment_column = pschema.auto_increment_column(); + _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id(); } _timestamp_ms = pschema.timestamp_ms(); _timezone = pschema.timezone(); @@ -186,6 +187,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { } if (_is_partial_update) { _auto_increment_column = tschema.auto_increment_column; + _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id; } for (const auto& tcolumn : tschema.partial_update_input_columns) { @@ -258,6 +260,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_partial_update(_is_partial_update); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); + pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); pschema->set_timestamp_ms(_timestamp_ms); pschema->set_timezone(_timezone); for (auto col : _partial_update_input_columns) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 20f4fa51fc6..fcba8fd8262 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -93,6 +93,7 @@ public: return _partial_update_input_columns; } std::string auto_increment_coulumn() const { return _auto_increment_column; } + int32_t auto_increment_column_unique_id() const { return _auto_increment_column_unique_id; } void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; } int64_t timestamp_ms() const { return _timestamp_ms; } void set_timezone(std::string timezone) { _timezone = timezone; } @@ -113,6 +114,7 @@ private: std::set<std::string> _partial_update_input_columns; bool _is_strict_mode = false; std::string _auto_increment_column; + int32_t _auto_increment_column_unique_id; int64_t _timestamp_ms = 0; std::string _timezone; }; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 164ee894e32..34c03fee95e 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -238,7 +238,8 @@ void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn()); } } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 3ee093c2b37..d26036252b4 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -68,16 +68,22 @@ MemTable::MemTable(int64_t tablet_id, const TabletSchema* tablet_schema, _query_thread_context.init_unlocked(); _arena = std::make_unique<vectorized::Arena>(); _vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema); - // TODO: Support ZOrderComparator in the future - _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); _num_columns = _tablet_schema->num_columns(); if (partial_update_info != nullptr) { _is_partial_update = partial_update_info->is_partial_update; if (_is_partial_update) { _num_columns = partial_update_info->partial_update_input_columns.size(); + if (partial_update_info->is_schema_contains_auto_inc_column && + !partial_update_info->is_input_columns_contains_auto_inc_column) { + _is_partial_update_and_auto_inc = true; + _num_columns += 1; + } } } + // TODO: Support ZOrderComparator in the future + _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); } + void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, const TupleDescriptor* tuple_desc) { for (auto slot_desc : *slot_descs) { @@ -89,6 +95,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescript } } } + if (_is_partial_update_and_auto_inc) { + _column_offset.emplace_back(_column_offset.size()); + } } void MemTable::_init_agg_functions(const vectorized::Block* block) { diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index c95e38fb05a..8362c69222e 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -265,6 +265,8 @@ private: size_t _num_columns; int32_t _seq_col_idx_in_block = -1; + + bool _is_partial_update_and_auto_inc = false; }; // class MemTable } // namespace doris diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 7a22ecf5035..e08021b4f38 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -24,7 +24,8 @@ namespace doris { struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, const std::set<string>& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, const std::string& timezone) { + int64_t timestamp_ms, const std::string& timezone, + const std::string& auto_increment_column) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; this->timestamp_ms = timestamp_ms; @@ -42,8 +43,13 @@ struct PartialUpdateInfo { } else { update_cids.emplace_back(i); } + if (auto_increment_column == tablet_column.name()) { + is_schema_contains_auto_inc_column = true; + } } this->is_strict_mode = is_strict_mode; + is_input_columns_contains_auto_inc_column = + is_partial_update && partial_update_input_columns.contains(auto_increment_column); } bool is_partial_update {false}; @@ -56,5 +62,7 @@ struct PartialUpdateInfo { bool is_strict_mode {false}; int64_t timestamp_ms {0}; std::string timezone; + bool is_input_columns_contains_auto_inc_column = false; + bool is_schema_contains_auto_inc_column = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 939c504580f..a4c2d988ab9 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -117,13 +117,6 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } - if (!_tablet_schema->auto_increment_column().empty()) { - _auto_inc_id_buffer = - vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( - _tablet_schema->db_id(), _tablet_schema->table_id(), - _tablet_schema->column(_tablet_schema->auto_increment_column()) - .unique_id()); - } // encode the rowid into the primary key index if (!_tablet_schema->cluster_key_idxes().empty()) { const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>(); @@ -559,7 +552,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // read and fill block auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + has_default_or_nullable, segment_start_pos, block)); full_block.set_columns(std::move(mutable_full_columns)); // row column should be filled here if (_tablet_schema->store_row_column()) { @@ -618,7 +611,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos) { + const size_t& segment_start_pos, + const vectorized::Block* block) { if (config::is_cloud_mode()) { // TODO(plat1ko): cloud mode return Status::NotSupported("fill_missing_columns"); @@ -712,18 +706,6 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f } } - // deal with partial update auto increment column when there no key in old block. - if (!_tablet_schema->auto_increment_column().empty()) { - if (_auto_inc_id_allocator.total_count < use_default_or_null_flag.size()) { - std::vector<std::pair<int64_t, size_t>> res; - RETURN_IF_ERROR( - _auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res)); - for (auto [start, length] : res) { - _auto_inc_id_allocator.insert_ids(start, length); - } - } - } - // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row @@ -751,7 +733,11 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f FieldType::OLAP_FIELD_TYPE_BIGINT); auto auto_inc_column = assert_cast<vectorized::ColumnInt64*>( mutable_full_columns[cids_missing[i]].get()); - auto_inc_column->insert(_auto_inc_id_allocator.next_id()); + auto_inc_column->insert( + (assert_cast<const vectorized::ColumnInt64*>( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") + .column.get())) + ->get_element(idx)); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index d1d38f7773f..2b04f36ac2d 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -41,7 +41,6 @@ #include "olap/tablet_schema.h" #include "util/faststring.h" #include "util/slice.h" -#include "vec/sink/autoinc_buffer.h" namespace doris { namespace vectorized { @@ -132,7 +131,8 @@ public: void set_mow_context(std::shared_ptr<MowContext> mow_context); Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); @@ -228,9 +228,6 @@ private: // group every rowset-segment row id to speed up reader PartialUpdateReadPlan _rssid_to_rid; std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset; - - std::shared_ptr<vectorized::AutoIncIDBuffer> _auto_inc_id_buffer = nullptr; - vectorized::AutoIncIDAllocator _auto_inc_id_allocator; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 7466f7861c5..448a2b7304c 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -107,11 +107,6 @@ VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32 const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); _seq_coder = get_key_coder(column.type()); } - if (!_tablet_schema->auto_increment_column().empty()) { - _auto_inc_id_buffer = vectorized::GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( - _tablet_schema->db_id(), _tablet_schema->table_id(), - _tablet_schema->column(_tablet_schema->auto_increment_column()).unique_id()); - } if (_tablet_schema->has_inverted_index()) { _inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>( fs ? fs : io::global_local_filesystem(), _file_writer->path().parent_path(), @@ -492,7 +487,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da // read and fill block auto mutable_full_columns = full_block.mutate_columns(); RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos)); + has_default_or_nullable, segment_start_pos, data.block)); // row column should be filled here if (_tablet_schema->store_row_column()) { // convert block to row store format @@ -551,7 +546,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da Status VerticalSegmentWriter::_fill_missing_columns( vectorized::MutableColumns& mutable_full_columns, const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos) { + const size_t& segment_start_pos, const vectorized::Block* block) { auto tablet = static_cast<Tablet*>(_tablet.get()); // create old value columns const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids; @@ -640,18 +635,6 @@ Status VerticalSegmentWriter::_fill_missing_columns( } } - // deal with partial update auto increment column when there no key in old block. - if (!_tablet_schema->auto_increment_column().empty()) { - if (_auto_inc_id_allocator.total_count < use_default_or_null_flag.size()) { - std::vector<std::pair<int64_t, size_t>> res; - RETURN_IF_ERROR( - _auto_inc_id_buffer->sync_request_ids(use_default_or_null_flag.size(), &res)); - for (auto [start, length] : res) { - _auto_inc_id_allocator.insert_ids(start, length); - } - } - } - // fill all missing value from mutable_old_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { // `use_default_or_null_flag[idx] == true` doesn't mean that we should read values from the old row @@ -679,7 +662,11 @@ Status VerticalSegmentWriter::_fill_missing_columns( FieldType::OLAP_FIELD_TYPE_BIGINT); auto auto_inc_column = assert_cast<vectorized::ColumnInt64*>( mutable_full_columns[missing_cids[i]].get()); - auto_inc_column->insert(_auto_inc_id_allocator.next_id()); + auto_inc_column->insert( + (assert_cast<const vectorized::ColumnInt64*>( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") + .column.get())) + ->get_element(idx)); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index ac328063b35..6c853aa185f 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -38,7 +38,6 @@ #include "olap/tablet_schema.h" #include "util/faststring.h" #include "util/slice.h" -#include "vec/sink/autoinc_buffer.h" namespace doris { namespace vectorized { @@ -147,7 +146,8 @@ private: Status _append_block_with_partial_content(RowsInBlock& data); Status _fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block); private: uint32_t _segment_id; @@ -195,8 +195,6 @@ private: std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset; std::vector<RowsInBlock> _batched_blocks; - std::shared_ptr<vectorized::AutoIncIDBuffer> _auto_inc_id_buffer = nullptr; - vectorized::AutoIncIDAllocator _auto_inc_id_allocator; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 9d8b8163b71..7fdaa41cc74 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -383,7 +383,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + table_schema_param->auto_increment_coulumn()); } } // namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 3f06df6c0b9..f2e2c887571 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -43,6 +43,7 @@ #include "runtime/task_execution_context.h" #include "util/debug_util.h" #include "util/runtime_profile.h" +#include "vec/columns/columns_number.h" namespace doris { class IRuntimeFilter; @@ -628,6 +629,10 @@ public: int task_num() const { return _task_num; } + vectorized::ColumnInt64* partial_update_auto_inc_column() { + return _partial_update_auto_inc_column; + }; + private: Status create_error_log_file(); @@ -757,6 +762,8 @@ private: // prohibit copies RuntimeState(const RuntimeState&); + + vectorized::ColumnInt64* _partial_update_auto_inc_column; }; #define RETURN_IF_CANCELLED(state) \ diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index 678c899d980..d93a654728d 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -19,6 +19,7 @@ #include <fmt/format.h> #include <gen_cpp/FrontendService.h> +#include <glog/logging.h> #include <google/protobuf/stubs/common.h> #include <algorithm> @@ -46,6 +47,7 @@ #include "vec/common/assert_cast.h" #include "vec/core/block.h" #include "vec/core/types.h" +#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr.h" @@ -66,8 +68,21 @@ Status OlapTableBlockConvertor::validate_and_convert_block( output_vexpr_ctxs, *input_block, block.get())); } - // fill the valus for auto-increment columns - if (_auto_inc_col_idx.has_value()) { + if (_is_partial_update_and_auto_inc) { + // If this load is partial update and this table has a auto inc column, + // e.g. table schema: k1, v1, v2(auto inc) + // 1. insert columns include auto inc column + // e.g. insert into table (k1, v2) value(a, 1); + // we do nothing. + // 2. insert columns do not include auto inc column + // e.g. insert into table (k1, v1) value(a, a); + // we need to fill auto_inc_cols by creating a new column. + if (!_auto_inc_col_idx.has_value()) { + RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows)); + } + } else if (_auto_inc_col_idx.has_value()) { + // fill the valus for auto-increment columns + DCHECK_EQ(_is_partial_update_and_auto_inc, false); RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows)); } @@ -91,8 +106,16 @@ Status OlapTableBlockConvertor::validate_and_convert_block( return Status::OK(); } -void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size) { +void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, + bool is_partial_update_and_auto_inc, + int32_t auto_increment_column_unique_id) { _batch_size = batch_size; + if (is_partial_update_and_auto_inc) { + _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc; + _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( + db_id, table_id, auto_increment_column_unique_id); + return; + } for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) { if (_output_tuple_desc->slots()[idx]->is_auto_increment()) { _auto_inc_col_idx = idx; @@ -522,4 +545,24 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si return Status::OK(); } +Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* block, + size_t rows) { + auto dst_column = vectorized::ColumnInt64::create(); + vectorized::ColumnInt64::Container& dst_values = dst_column->get_data(); + size_t null_value_count = rows; + std::vector<std::pair<int64_t, size_t>> res; + RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); + for (auto [start, length] : res) { + _auto_inc_id_allocator.insert_ids(start, length); + } + + for (size_t i = 0; i < rows; i++) { + dst_values.emplace_back(_auto_inc_id_allocator.next_id()); + } + block->insert(vectorized::ColumnWithTypeAndName(std::move(dst_column), + std::make_shared<DataTypeNumber<Int64>>(), + "__PARTIAL_UPDATE_AUTO_INC_COLUMN__")); + return Status::OK(); +} + } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 4eaaef3869c..0db340ce6c2 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -53,7 +53,9 @@ public: int64_t num_filtered_rows() const { return _num_filtered_rows; } - void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size); + void init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, + bool is_partial_update_and_auto_inc = false, + int32_t auto_increment_column_unique_id = -1); AutoIncIDAllocator& auto_inc_id_allocator() { return _auto_inc_id_allocator; } @@ -82,6 +84,8 @@ private: Status _fill_auto_inc_cols(vectorized::Block* block, size_t rows); + Status _partial_update_fill_auto_inc_cols(vectorized::Block* block, size_t rows); + TupleDescriptor* _output_tuple_desc = nullptr; std::map<std::pair<int, int>, DecimalV2Value> _max_decimalv2_val; @@ -105,6 +109,7 @@ private: std::optional<size_t> _auto_inc_col_idx; std::shared_ptr<AutoIncIDBuffer> _auto_inc_id_buffer = nullptr; AutoIncIDAllocator _auto_inc_id_allocator; + bool _is_partial_update_and_auto_inc = false; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 3d97f71d7df..b858243fd91 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1186,8 +1186,10 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); + _block_convertor->init_autoinc_info( + _schema->db_id(), _schema->table_id(), _state->batch_size(), + _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), + _schema->auto_increment_column_unique_id()); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 58639c152f7..5e8b57ee029 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -214,8 +214,10 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc); - _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), - _state->batch_size()); + _block_convertor->init_autoinc_info( + _schema->db_id(), _schema->table_id(), _state->batch_size(), + _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), + _schema->auto_increment_column_unique_id()); _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); // add all counter diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index f723e9dc15b..7cc316c350f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -322,6 +322,7 @@ public class OlapTableSink extends DataSink { for (Column col : table.getFullSchema()) { if (col.isAutoInc()) { schemaParam.setAutoIncrementColumn(col.getName()); + schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId()); } } } diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 9d6945becc0..13c069f414f 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -72,5 +72,6 @@ message POlapTableSchemaParam { optional string auto_increment_column = 10; optional int64 timestamp_ms = 11 [default = 0]; optional string timezone = 12; + optional int32 auto_increment_column_unique_id = 13; }; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 7f1e70d1b20..2d8390e2667 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -245,6 +245,7 @@ struct TOlapTableSchemaParam { 9: optional list<string> partial_update_input_columns 10: optional bool is_strict_mode = false 11: optional string auto_increment_column + 12: optional i32 auto_increment_column_unique_id = -1 } struct TTabletLocation { diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out new file mode 100644 index 00000000000..57d16693a92 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.out @@ -0,0 +1,97 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1_1 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select1_2 -- + +-- !select1_3 -- +Bob 123 +Alice 200 +Tom 323 +Test 400 +Carter 523 +Smith 600 +Beata 723 +Doris 800 +Nereids 923 + +-- !select1_4 -- + +-- !select2_1 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select2_2 -- + +-- !select2_3 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select2_4 -- + +-- !select3_1 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select3_2 -- + +-- !select3_3 -- +Alice 200 +Test 400 +Smith 600 +Doris 800 +Bob 9990 +Tom 9992 +Carter 9994 +Beata 9996 +Nereids 9998 + +-- !select3_4 -- + +-- !select3_5 -- +Alice 200 +Test 400 +Smith 600 +Doris 800 +Bob 9990 +BBBBob 9990 +Tom 9992 +TTTTom 9992 +Carter 9994 +CCCCarter 9994 +BBBBeata 9996 +Beata 9996 +NNNNereids 9998 +Nereids 9998 + +-- !select3_6 -- + diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out new file mode 100644 index 00000000000..b48b8ab73c8 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.out @@ -0,0 +1,95 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1_1 -- +1 Bob 100 +2 Alice 200 +3 Tom 300 +4 Test 400 +5 Carter 500 +6 Smith 600 +7 Beata 700 +8 Doris 800 +9 Nereids 900 + +-- !select1_1 -- +Bob 123 +Alice 200 +Tom 323 +Test 400 +Carter 523 +Smith 600 +Beata 723 +Doris 800 +Nereids 923 + +-- !select1_2 -- + +-- !select2_1 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select2_2 -- + +-- !select2_3 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select2_4 -- + +-- !select3_1 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !select3_2 -- + +-- !select3_3 -- +Alice 200 +Test 400 +Smith 600 +Doris 800 +Bob 9990 +Tom 9992 +Carter 9994 +Beata 9996 +Nereids 9998 + +-- !select3_4 -- + +-- !select3_5 -- +Alice 200 +Test 400 +Smith 600 +Doris 800 +Bob 9990 +BBob 9990 +TTom 9992 +Tom 9992 +CCarter 9994 +Carter 9994 +BBeata 9996 +Beata 9996 +Nereids 9998 +NNereids 9998 + +-- !select3_6 -- + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy new file mode 100644 index 00000000000..e0e86c7c6b6 --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_insert.groovy @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_partial_update_correct_insert") { + + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + def check_data_correct = { def tableName -> + def old_result = sql "select id from ${tableName} order by id;" + logger.info("first result: " + old_result) + for (int i = 1; i<30; ++i){ + def new_result = sql "select id from ${tableName} order by id;" + logger.info("new result: " + new_result) + for (int j = 0; j<old_result.size();++j){ + if (old_result[j][0]!=new_result[j][0]){ + logger.info("table name: " + tableName) + logger.info("old result: " + old_result) + logger.info("new result: " + new_result) + assertTrue(false) + } + } + old_result = new_result + } + } + + // test for partial update, auto inc col is key + def table1 = "unique_auto_inc_col_key_partial_update_insert" + sql "drop table if exists ${table1}" + sql """ + CREATE TABLE IF NOT EXISTS `${table1}` ( + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table1} (name, value) values ('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)" + + qt_select1_1 "select name, value from ${table1} order by value;" + qt_select1_2 "select id, count(*) from ${table1} group by id having count(*) > 1;" + check_data_correct(table1) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + + // 1, 123 + // 3, 323 + // 5, 523 + // 7, 723 + // 9, 923 + sql "insert into ${table1} (id, value) values (1,123),(3,323),(5,523),(7,723),(9,923)" + qt_select1_3 "select name, value from ${table1} order by value;" + qt_select1_4 "select id, count(*) from ${table1} group by id having count(*) > 1;" + check_data_correct(table1) + sql "drop table if exists ${table1};" + + // test for partial update, auto inc col is value, update auto inc col + def table2 = "unique_auto_inc_col_value_partial_update_insert" + sql "drop table if exists ${table2}" + sql """ + CREATE TABLE IF NOT EXISTS `${table2}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table2} (name, value) values ('Bob',100)" + sql "insert into ${table2} (name, value) values ('Alice',200)" + sql "insert into ${table2} (name, value) values ('Tom',300)" + sql "insert into ${table2} (name, value) values ('Test',400)" + sql "insert into ${table2} (name, value) values ('Carter',500)" + sql "insert into ${table2} (name, value) values ('Smith',600)" + sql "insert into ${table2} (name, value) values ('Beata',700)" + sql "insert into ${table2} (name, value) values ('Doris',800)" + sql "insert into ${table2} (name, value) values ('Nereids',900)" + qt_select2_1 "select name, value from ${table2} order by value;" + qt_select2_2 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + sql "insert into ${table2} (name, id) values ('Bob',9990),('Tom',9992),('Carter',9994),('Beata',9996),('Nereids',9998)" + qt_select2_3 "select name, value from ${table2} order by value;" + qt_select2_4 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + sql "drop table if exists ${table2};" + + // test for partial update, auto inc col is value, update other col + def table3 = "unique_auto_inc_col_value_partial_update_insert" + sql "drop table if exists ${table3}" + sql """ + CREATE TABLE IF NOT EXISTS `${table3}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + sql "set enable_unique_key_partial_update=false;" + sql "set enable_insert_strict=true;" + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + sql "insert into ${table2} (name, value) values ('Bob',100),('Alice',200),('Tom',300),('Test',400),('Carter',500),('Smith',600),('Beata',700),('Doris',800),('Nereids',900)" + qt_select3_1 "select name, value from ${table3} order by value;" + qt_select3_2 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + sql "insert into ${table2} (name, value) values ('Bob',9990)" + sql "insert into ${table2} (name, value) values ('Tom',9992)" + sql "insert into ${table2} (name, value) values ('Carter',9994)" + sql "insert into ${table2} (name, value) values ('Beata',9996)" + sql "insert into ${table2} (name, value) values ('Nereids',9998)" + qt_select3_3 "select name, value from ${table3} order by value;" + qt_select3_4 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + sql "insert into ${table2} (name, value) values ('BBBBob',9990)" + sql "insert into ${table2} (name, value) values ('TTTTom',9992)" + sql "insert into ${table2} (name, value) values ('CCCCarter',9994)" + sql "insert into ${table2} (name, value) values ('BBBBeata',9996)" + sql "insert into ${table2} (name, value) values ('NNNNereids',9998)" + qt_select3_5 "select name, value from ${table3} order by value;" + qt_select3_6 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + sql "drop table if exists ${table3};" +} + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy new file mode 100644 index 00000000000..70a84ee971d --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc_partial_update_consistency_stream_load.groovy @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_auto_inc_partial_update_correct_stream_load") { + + def backends = sql_return_maparray('show backends') + def replicaNum = 0 + def targetBackend = null + for (def be : backends) { + def alive = be.Alive.toBoolean() + def decommissioned = be.SystemDecommissioned.toBoolean() + if (alive && !decommissioned) { + replicaNum++ + targetBackend = be + } + } + assertTrue(replicaNum > 0) + + def check_data_correct = { def tableName -> + def old_result = sql "select id from ${tableName} order by id;" + logger.info("first result: " + old_result) + for (int i = 1; i<30; ++i){ + def new_result = sql "select id from ${tableName} order by id;" + logger.info("new result: " + new_result) + for (int j = 0; j<old_result.size();++j){ + if (old_result[j][0]!=new_result[j][0]){ + logger.info("table name: " + tableName) + logger.info("old result: " + old_result) + logger.info("new result: " + new_result) + assertTrue(false) + } + } + old_result = new_result + } + } + // test for partial update, auto inc col is key + def table1 = "unique_auto_inc_col_key_partial_update_stream_load" + sql "drop table if exists ${table1}" + sql """ + CREATE TABLE IF NOT EXISTS `${table1}` ( + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID", + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + streamLoad { + table "${table1}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select1_1 "select * from ${table1} order by id;" + + // 1, 123 + // 3, 323 + // 5, 523 + // 7, 723 + // 9, 923 + streamLoad { + table "${table1}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'id, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update1.csv' + time 10000 + } + sql "sync" + qt_select1_1 "select name, value from ${table1} order by value;" + qt_select1_2 "select id, count(*) from ${table1} group by id having count(*) > 1;" + check_data_correct(table1) + sql "drop table if exists ${table1};" + + // test for partial update, auto inc col is value, update auto inc col + def table2 = "unique_auto_inc_col_value_partial_update_stream_load" + sql "drop table if exists ${table2}" + sql """ + CREATE TABLE IF NOT EXISTS `${table2}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + + // Bob, 100 + // Alice, 200 + // Tom, 300 + // Test, 400 + // Carter, 500 + // Smith, 600 + // Beata, 700 + // Doris, 800 + // Nereids, 900 + streamLoad { + table "${table2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select2_1 "select name, value from ${table2} order by value;" + qt_select2_2 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + streamLoad { + table "${table2}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, id' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update2.csv' + time 10000 + } + sql "sync" + qt_select2_3 "select name, value from ${table2} order by value;" + qt_select2_4 "select id, count(*) from ${table2} group by id having count(*) > 1;" + check_data_correct(table2) + sql "drop table if exists ${table2};" + + // test for partial update, auto inc col is value, update other col + def table3 = "unique_auto_inc_col_value_partial_update_stream_load" + sql "drop table if exists ${table3}" + sql """ + CREATE TABLE IF NOT EXISTS `${table3}` ( + `name` varchar(65533) NOT NULL COMMENT "用户姓名", + `value` int(11) NOT NULL COMMENT "用户得分", + `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID" + ) ENGINE=OLAP + UNIQUE KEY(`name`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`name`) BUCKETS 3 + PROPERTIES ( + "replication_num" = "${replicaNum}", + "in_memory" = "false", + "storage_format" = "V2", + "enable_unique_key_merge_on_write" = "true" + ) + """ + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + + file 'auto_inc_basic.csv' + time 10000 // limit inflight 10s + } + sql "sync" + qt_select3_1 "select name, value from ${table3} order by value;" + qt_select3_2 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + + // Bob, 9990 + // Tom, 9992 + // Carter, 9994 + // Beata, 9996 + // Nereids, 9998 + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update2.csv' + time 10000 + } + sql "sync" + qt_select3_3 "select name, value from ${table3} order by value;" + qt_select3_4 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + // BBob, 9990 + // TTom, 9992 + // CCarter, 9994 + // BBeata, 9996 + // NNereids, 9998 + streamLoad { + table "${table3}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'name, value' + set 'partial_columns', 'true' + + file 'auto_inc_partial_update3.csv' + time 10000 + } + sql "sync" + qt_select3_5 "select name, value from ${table3} order by value;" + qt_select3_6 "select id, count(*) from ${table3} group by id having count(*) > 1;" + check_data_correct(table3) + sql "drop table if exists ${table3};" +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org