This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bd52fa1966 [enhancement](checksum) use vertorized engine in checksum (#15260) bd52fa1966 is described below commit bd52fa1966f09698192d08248df100a7046a3b92 Author: yixiutt <102007456+yixi...@users.noreply.github.com> AuthorDate: Mon Dec 26 10:28:15 2022 +0800 [enhancement](checksum) use vertorized engine in checksum (#15260) --- be/src/olap/reader.cpp | 50 ++++++++++++++++++++++++++++++- be/src/olap/reader.h | 4 +++ be/src/olap/task/engine_checksum_task.cpp | 50 +++++++++++++++++++++++++++++-- be/src/olap/task/engine_checksum_task.h | 1 + 4 files changed, 102 insertions(+), 3 deletions(-) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 1e42f0a040..7a0df6c66f 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -520,7 +520,8 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { // other reader type: // QUERY will filter the row in query layer to keep right result use where clause. // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset - if (read_params.reader_type == READER_BASE_COMPACTION) { + if (read_params.reader_type == READER_BASE_COMPACTION || + read_params.reader_type == READER_CHECKSUM) { _filter_delete = true; } @@ -528,4 +529,51 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) { read_params.version.second); } +Status TabletReader::init_reader_params_and_create_block( + TabletSharedPtr tablet, ReaderType reader_type, + const std::vector<RowsetSharedPtr>& input_rowsets, + TabletReader::ReaderParams* reader_params, vectorized::Block* block) { + reader_params->tablet = tablet; + reader_params->reader_type = reader_type; + reader_params->version = + Version(input_rowsets.front()->start_version(), input_rowsets.back()->end_version()); + + for (auto& rowset : input_rowsets) { + RowsetReaderSharedPtr rs_reader; + RETURN_NOT_OK(rowset->create_reader(&rs_reader)); + reader_params->rs_readers.push_back(std::move(rs_reader)); + } + + std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size()); + std::transform(input_rowsets.begin(), input_rowsets.end(), rowset_metas.begin(), + [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); + TabletSchemaSPtr read_tablet_schema = + tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema(); + TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>(); + merge_tablet_schema->copy_from(*read_tablet_schema); + { + std::shared_lock rdlock(tablet->get_header_lock()); + auto& delete_preds = tablet->delete_predicates(); + std::copy(delete_preds.cbegin(), delete_preds.cend(), + std::inserter(reader_params->delete_predicates, + reader_params->delete_predicates.begin())); + } + // Merge the columns in delete predicate that not in latest schema in to current tablet schema + for (auto& del_pred_pb : reader_params->delete_predicates) { + merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_pb->version())); + } + reader_params->tablet_schema = merge_tablet_schema; + if (tablet->enable_unique_key_merge_on_write()) { + reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + } + + reader_params->return_columns.resize(read_tablet_schema->num_columns()); + std::iota(reader_params->return_columns.begin(), reader_params->return_columns.end(), 0); + reader_params->origin_return_columns = &reader_params->return_columns; + + *block = read_tablet_schema->create_block(); + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 473e7697b6..3c4ba1a0d6 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -142,6 +142,10 @@ public: OlapReaderStatistics* mutable_stats() { return &_stats; } virtual bool update_profile(RuntimeProfile* profile) { return false; } + static Status init_reader_params_and_create_block( + TabletSharedPtr tablet, ReaderType reader_type, + const std::vector<RowsetSharedPtr>& input_rowsets, + TabletReader::ReaderParams* reader_params, vectorized::Block* block); protected: friend class vectorized::VCollectIterator; diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index ee1fe11198..6643b5e066 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -18,12 +18,13 @@ #include "olap/task/engine_checksum_task.h" #include "runtime/thread_context.h" +#include "vec/olap/block_reader.h" namespace doris { EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash, TVersion version, uint32_t* checksum) - : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version) { + : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { _mem_tracker = std::make_shared<MemTrackerLimiter>( MemTrackerLimiter::Type::CONSISTENCY, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id)); @@ -38,7 +39,52 @@ Status EngineChecksumTask::_compute_checksum() { LOG(INFO) << "begin to process compute checksum." << "tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash << ", version=" << _version; - return Status::InternalError("Not implemented yet"); + + if (_checksum == nullptr) { + return Status::InvalidArgument("invalid checksum which is nullptr"); + } + + TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(_tablet_id); + if (nullptr == tablet) { + return Status::InternalError("could not find tablet {}", _tablet_id); + } + + std::vector<RowsetSharedPtr> input_rowsets; + Version version(0, _version); + Status acquire_reader_st = tablet->capture_consistent_rowsets(version, &input_rowsets); + if (acquire_reader_st != Status::OK()) { + LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << tablet->full_name() + << "res=" << acquire_reader_st; + return acquire_reader_st; + } + vectorized::BlockReader reader; + TabletReader::ReaderParams reader_params; + vectorized::Block block; + RETURN_NOT_OK(TabletReader::init_reader_params_and_create_block( + tablet, READER_CHECKSUM, input_rowsets, &reader_params, &block)) + + auto res = reader.init(reader_params); + if (!res.ok()) { + LOG(WARNING) << "initiate reader fail. res = " << res; + return res; + } + + bool eof = false; + SipHash block_hash; + uint64_t rows = 0; + while (!eof) { + RETURN_IF_ERROR(reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof)); + rows += block.rows(); + + block.update_hash(block_hash); + block.clear_column_data(); + } + uint64_t checksum64 = block_hash.get64(); + *_checksum = (checksum64 >> 32) ^ (checksum64 & 0xffffffff); + + LOG(INFO) << "success to finish compute checksum. tablet_id = " << _tablet_id + << ", rows = " << rows << ", checksum=" << *_checksum; + return Status::OK(); } } // namespace doris diff --git a/be/src/olap/task/engine_checksum_task.h b/be/src/olap/task/engine_checksum_task.h index 2979e9b000..04afa1a5cd 100644 --- a/be/src/olap/task/engine_checksum_task.h +++ b/be/src/olap/task/engine_checksum_task.h @@ -43,6 +43,7 @@ private: TTabletId _tablet_id; TSchemaHash _schema_hash; TVersion _version; + uint32_t* _checksum; std::shared_ptr<MemTrackerLimiter> _mem_tracker; }; // EngineTask --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org