This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new efdc73777a [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101) efdc73777a is described below commit efdc73777a21275b68c52970b7871ad1582e74f5 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Wed Dec 21 09:50:13 2022 +0800 [enhancement](load) verify the number of rows between different replicas when load data to avoid data inconsistency (#15101) It is very difficult to investigate the data inconsistency of multiple replicas. When loading data, the number of rows between replicas is checked to avoid some data inconsistency problems. --- be/src/exec/tablet_sink.cpp | 41 +++++++++++++++++++++++++++++++ be/src/exec/tablet_sink.h | 11 +++++++++ be/src/olap/delta_writer.cpp | 10 ++++++++ be/src/olap/delta_writer.h | 7 ++++++ be/src/olap/memtable.cpp | 2 ++ be/src/olap/memtable.h | 2 ++ be/src/olap/rowset/beta_rowset_writer.cpp | 1 + be/src/vec/sink/vtablet_sink.cpp | 4 +++ gensrc/proto/internal_service.proto | 2 ++ 9 files changed, 80 insertions(+) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 6e835228da..106623fa93 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -247,6 +247,10 @@ Status NodeChannel::open_wait() { commit_info.tabletId = tablet.tablet_id(); commit_info.backendId = _node_id; _tablet_commit_infos.emplace_back(std::move(commit_info)); + if (tablet.has_received_rows()) { + _tablets_received_rows.emplace_back(tablet.tablet_id(), + tablet.received_rows()); + } VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() << ", backendId=" << _node_id @@ -457,6 +461,7 @@ Status NodeChannel::close_wait(RuntimeState* state) { std::make_move_iterator(_tablet_commit_infos.end())); _index_channel->set_error_tablet_in_state(state); + _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); return Status::OK(); } @@ -769,6 +774,39 @@ void IndexChannel::set_error_tablet_in_state(RuntimeState* state) { } } +void IndexChannel::set_tablets_received_rows( + const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id) { + for (const auto& [tablet_id, rows_num] : tablets_received_rows) { + _tablets_received_rows[tablet_id].emplace_back(node_id, rows_num); + } +} + +Status IndexChannel::check_tablet_received_rows_consistency() { + for (auto& tablet : _tablets_received_rows) { + for (size_t i = 0; i < tablet.second.size(); i++) { + VLOG_NOTICE << "check_tablet_received_rows_consistency, load_id: " << _parent->_load_id + << ", txn_id: " << std::to_string(_parent->_txn_id) + << ", tablet_id: " << tablet.first + << ", node_id: " << tablet.second[i].first + << ", rows_num: " << tablet.second[i].second; + if (i == 0) { + continue; + } + if (tablet.second[i].second != tablet.second[0].second) { + LOG(WARNING) << "rows num doest't match, load_id: " << _parent->_load_id + << ", txn_id: " << std::to_string(_parent->_txn_id) + << ", tablt_id: " << tablet.first + << ", node_id: " << tablet.second[i].first + << ", rows_num: " << tablet.second[i].second + << ", node_id: " << tablet.second[0].first + << ", rows_num: " << tablet.second[0].second; + return Status::InternalError("rows num written by multi replicas doest't match"); + } + } + } + return Status::OK(); +} + OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs, Status* status) : _pool(pool), @@ -1155,6 +1193,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { Status index_st = index_channel->check_intolerable_failure(); if (!index_st.ok()) { status = index_st; + } else if (Status st = index_channel->check_tablet_received_rows_consistency(); + !st.ok()) { + status = st; } } // end for index channels } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index a177db8a9b..2216c89fff 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -321,6 +321,8 @@ protected: bool _is_closed = false; RuntimeState* _state; + // rows number received per tablet, tablet_id -> rows_num + std::vector<std::pair<int64_t, int64_t>> _tablets_received_rows; private: std::unique_ptr<RowBatch> _cur_batch; @@ -368,6 +370,12 @@ public: return mem_consumption; } + void set_tablets_received_rows( + const std::vector<std::pair<int64_t, int64_t>>& tablets_received_rows, int64_t node_id); + + // check whether the rows num written by different replicas is consistent + Status check_tablet_received_rows_consistency(); + private: friend class NodeChannel; friend class VNodeChannel; @@ -398,6 +406,9 @@ private: Status _intolerable_failure_status = Status::OK(); std::unique_ptr<MemTracker> _index_channel_tracker; + // rows num received by DeltaWriter per tablet, tablet_id -> <node_Id, rows_num> + // used to verify whether the rows num received by different replicas is consistent + std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> _tablets_received_rows; }; template <typename Row> diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 6b0cfb3e10..2a0fded006 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -167,6 +167,7 @@ Status DeltaWriter::write(Tuple* tuple) { return _cancel_status; } + _total_received_rows++; _mem_table->insert(tuple); // if memtable is full, push it to the flush executor, @@ -192,6 +193,7 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row return _cancel_status; } + _total_received_rows += row_idxs.size(); for (const auto& row_idx : row_idxs) { _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0)); } @@ -220,6 +222,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return _cancel_status; } + _total_received_rows += row_idxs.size(); _mem_table->insert(block, row_idxs); if (_mem_table->need_to_agg()) { @@ -237,6 +240,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> } Status DeltaWriter::_flush_memtable_async() { + _merged_rows += _mem_table->merged_rows(); return _flush_token->submit(std::move(_mem_table)); } @@ -358,6 +362,12 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, _mem_table.reset(); + if (_rowset_writer->num_rows() + _merged_rows != _total_received_rows) { + LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: " + << _rowset_writer->num_rows() << ", merged_rows: " << _merged_rows + << ", total received rows: " << _total_received_rows; + return Status::InternalError("rows number written by delta writer dosen't match"); + } // use rowset meta manager to save meta _cur_rowset = _rowset_writer->build(); if (_cur_rowset == nullptr) { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 5484c7c7db..dafd77a8f8 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -110,6 +110,8 @@ public: void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed); + int64_t total_received_rows() const { return _total_received_rows; } + private: DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id, bool is_vec); @@ -172,6 +174,11 @@ private: RowsetIdUnorderedSet _rowset_ids; // current max version, used to calculate delete bitmap int64_t _cur_max_version; + + // total rows num written by DeltaWriter + int64_t _total_received_rows = 0; + // rows num merged by memtable + int64_t _merged_rows = 0; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 3881ce8dce..63575a3bb2 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -220,6 +220,7 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) { bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint); if (is_exist) { + _merged_rows++; _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key); } else { row_in_block->init_agg_places( @@ -249,6 +250,7 @@ void MemTable::_insert_agg(const Tuple* tuple) { bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint); if (is_exist) { + _merged_rows++; (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key); } else { tuple_buf = _table_mem_pool->allocate(_schema_size); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index e7f59ff151..27606f3552 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -70,6 +70,7 @@ public: Status close(); int64_t flush_size() const { return _flush_size; } + int64_t merged_rows() const { return _merged_rows; } private: Status _do_flush(int64_t& duration_ns); @@ -202,6 +203,7 @@ private: // This is not the rows in this memtable, because rows may be merged // in unique or aggregate key model. int64_t _rows = 0; + int64_t _merged_rows = 0; void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr; void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row, TableKey row_in_skiplist) = nullptr; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 8da5ca117a..e6ee1833b4 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -661,6 +661,7 @@ Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flus } ContiguousRow dst_row = it.get_current_row(); auto s = writer->append_row(dst_row); + _raw_num_rows_written++; if (PREDICT_FALSE(!s.ok())) { LOG(WARNING) << "failed to append row: " << s.to_string(); return Status::Error<WRITER_DATA_WRITE_ERROR>(); diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index de59e0b5a7..ae799e46d9 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -135,6 +135,10 @@ Status VNodeChannel::open_wait() { commit_info.tabletId = tablet.tablet_id(); commit_info.backendId = _node_id; _tablet_commit_infos.emplace_back(std::move(commit_info)); + if (tablet.has_received_rows()) { + _tablets_received_rows.emplace_back(tablet.tablet_id(), + tablet.received_rows()); + } VLOG_CRITICAL << "master replica commit info: tabletId=" << tablet.tablet_id() << ", backendId=" << _node_id << ", master node id: " << this->node_id() diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ff05509e67..552786f313 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -65,6 +65,8 @@ message PTabletInfo { // Delta Writer will write data to local disk and then check if there are new raw values not in global dict // if appears, then it should add the column name to this vector repeated string invalid_dict_cols = 3; + // total rows num received by DeltaWriter + optional int64 received_rows = 4; } // open a tablet writer --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org