This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new cc924c9 [Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632) cc924c9 is described below commit cc924c9e6aac56b245defd6aff6cd6529e53e9c7 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Thu Jan 2 14:10:05 2020 +0800 [Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632) When merge reads from one rowset with multi overlapping segments, I introduce a priority queue(A Minimum heap data structure) for multipath merge sort, to replace the old N*M time complexity algorithm. This can significantly improve the read efficiency when merging large number of overlapping data. In mytest: 1. Compaction with 187 segments reduce time from 75 seconds to 42 seconds 2. Compaction with 3574 segments cost 43 seconds, and with old version, I kill the process after waiting more than 10 minutes... This CL only change the reads of alpha rowset. Beta rowset will be changed in another CL. ISSUE: #2631 --- be/src/olap/rowset/alpha_rowset_reader.cpp | 61 ++++++++++++++++++++++++++- be/src/olap/rowset/alpha_rowset_reader.h | 22 ++++++++++ be/test/olap/rowset/alpha_rowset_test.cpp | 66 ++++++++++++++++++++++++++++++ run-ut.sh | 2 +- 4 files changed, 149 insertions(+), 2 deletions(-) diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp index 4dd3261..18aec29 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.cpp +++ b/be/src/olap/rowset/alpha_rowset_reader.cpp @@ -85,6 +85,7 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { *(_current_read_context->seek_columns)); } } + RETURN_NOT_OK(_init_merge_heap()); } else { _next_block = &AlphaRowsetReader::_union_block; } @@ -141,13 +142,16 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { size_t num_rows_in_block = 0; while (_read_block->pos() < _num_rows_per_row_block) { RowCursor* row_cursor = nullptr; - status = _pull_next_row_for_merge_rowset(&row_cursor); + status = _pull_next_row_for_merge_rowset_v2(&row_cursor); if (status == OLAP_ERR_DATA_EOF && _read_block->pos() > 0) { status = OLAP_SUCCESS; break; } else if (status != OLAP_SUCCESS) { return status; } + + VLOG(10) << "get merged row: " << row_cursor->to_string(); + _read_block->get_row(_read_block->pos(), _dst_cursor); copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool()); _read_block->pos_inc(); @@ -160,6 +164,57 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { return status; } +OLAPStatus AlphaRowsetReader::_init_merge_heap() { + if (_merge_heap.empty() && !_merge_ctxs.empty()) { + for (auto& merge_ctx : _merge_ctxs) { + RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(&merge_ctx)); + } + } + return OLAP_SUCCESS; +} + +OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx) { + if (merge_ctx->is_eof) { + // nothing in this merge ctx, just return + return OLAP_SUCCESS; + } + + // get next row block of this merge ctx + if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) { + OLAPStatus status = _pull_next_block(merge_ctx); + if (status == OLAP_ERR_DATA_EOF) { + merge_ctx->is_eof = true; + return OLAP_SUCCESS; + } else if (status != OLAP_SUCCESS) { + LOG(WARNING) << "read next row of singleton rowset failed:" << status; + return status; + } + } + + // read the first row, push it into merge heap, and step forward + RowCursor* current_row = merge_ctx->row_cursor.get(); + merge_ctx->row_block->get_row(merge_ctx->row_block->pos(), current_row); + _merge_heap.push(merge_ctx); + merge_ctx->row_block->pos_inc(); + return OLAP_SUCCESS; +} + +OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row) { + // if _merge_heap is not empty, return the row at top, and insert a new row + // from conresponding merge_ctx + if (!_merge_heap.empty()) { + AlphaMergeContext* merge_ctx = _merge_heap.top(); + *row = merge_ctx->row_cursor.get(); + _merge_heap.pop(); + + RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx)); + return OLAP_SUCCESS; + } else { + // all rows are read + return OLAP_ERR_DATA_EOF; + } +} + OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset(RowCursor** row) { RowCursor* min_row = nullptr; int min_index = -1; @@ -326,4 +381,8 @@ RowsetSharedPtr AlphaRowsetReader::rowset() { return std::static_pointer_cast<Rowset>(_rowset); } +bool AlphaMergeContextComparator::operator() (const AlphaMergeContext* x, const AlphaMergeContext* y) const { + return compare_row(*(x->row_cursor.get()), *(y->row_cursor.get())) > 0; +} + } // namespace doris diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index 7d96192..3f3eb58 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -25,6 +25,7 @@ #include "olap/rowset/alpha_rowset_meta.h" #include <vector> +#include <queue> namespace doris { @@ -41,6 +42,12 @@ struct AlphaMergeContext { RowBlock* row_block = nullptr; std::unique_ptr<RowCursor> row_cursor = nullptr; + + bool is_eof = false; +}; + +struct AlphaMergeContextComparator { + bool operator () (const AlphaMergeContext* x, const AlphaMergeContext* y) const; }; class AlphaRowsetReader : public RowsetReader { @@ -79,6 +86,18 @@ private: // current scan key to next scan key. OLAPStatus _pull_first_block(AlphaMergeContext* merge_ctx); + // merge by priority queue(_merge_heap) + // this method has same function with _pull_next_row_for_merge_rowset, but using heap merge. + // and this should replace the _pull_next_row_for_merge_rowset later. + OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row); + // init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2(); + OLAPStatus _init_merge_heap(); + // update the merge ctx. + // 1. get next row block of this ctx, if current row block is empty. + // 2. read the current row of the row block and push it to merge heap. + // 3. point to the next row of the row block + OLAPStatus _update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx); + private: int _num_rows_per_row_block; AlphaRowsetSharedPtr _rowset; @@ -103,6 +122,9 @@ private: RowsetReaderContext* _current_read_context; OlapReaderStatistics _owned_stats; OlapReaderStatistics* _stats = &_owned_stats; + + // a priority queue for merging rowsets + std::priority_queue<AlphaMergeContext*, vector<AlphaMergeContext*>, AlphaMergeContextComparator> _merge_heap; }; } // namespace doris diff --git a/be/test/olap/rowset/alpha_rowset_test.cpp b/be/test/olap/rowset/alpha_rowset_test.cpp index c1ea009..1c26bd3 100644 --- a/be/test/olap/rowset/alpha_rowset_test.cpp +++ b/be/test/olap/rowset/alpha_rowset_test.cpp @@ -250,6 +250,72 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) { ASSERT_EQ(1, row_block->remaining()); } +TEST_F(AlphaRowsetTest, TestRowCursorWithOrdinal) { + TabletSchema tablet_schema; + create_tablet_schema(AGG_KEYS, &tablet_schema); + + RowCursor* row1 = new (std::nothrow) RowCursor(); // 10, "well", 100 + row1->init(tablet_schema); + int32_t field1_0 = 10; + row1->set_not_null(0); + row1->set_field_content(0, reinterpret_cast<char*>(&field1_0), _mem_pool.get()); + Slice field1_1("well"); + row1->set_not_null(1); + row1->set_field_content(1, reinterpret_cast<char*>(&field1_1), _mem_pool.get()); + int32_t field1_2 = 100; + row1->set_not_null(2); + row1->set_field_content(2, reinterpret_cast<char*>(&field1_2), _mem_pool.get()); + + RowCursor* row2 = new (std::nothrow) RowCursor(); // 11, "well", 100 + row2->init(tablet_schema); + int32_t field2_0 = 11; + row2->set_not_null(0); + row2->set_field_content(0, reinterpret_cast<char*>(&field2_0), _mem_pool.get()); + Slice field2_1("well"); + row2->set_not_null(1); + row2->set_field_content(1, reinterpret_cast<char*>(&field2_1), _mem_pool.get()); + int32_t field2_2 = 100; + row2->set_not_null(2); + row2->set_field_content(2, reinterpret_cast<char*>(&field2_2), _mem_pool.get()); + + RowCursor* row3 = new (std::nothrow) RowCursor(); // 11, "good", 100 + row3->init(tablet_schema); + int32_t field3_0 = 11; + row3->set_not_null(0); + row3->set_field_content(0, reinterpret_cast<char*>(&field3_0), _mem_pool.get()); + Slice field3_1("good"); + row3->set_not_null(1); + row3->set_field_content(1, reinterpret_cast<char*>(&field3_1), _mem_pool.get()); + int32_t field3_2 = 100; + row3->set_not_null(2); + row3->set_field_content(2, reinterpret_cast<char*>(&field3_2), _mem_pool.get()); + + std::priority_queue<AlphaMergeContext*, std::vector<AlphaMergeContext*>, AlphaMergeContextComparator> queue; + AlphaMergeContext ctx1; + ctx1.row_cursor.reset(row1); + AlphaMergeContext ctx2; + ctx2.row_cursor.reset(row2); + AlphaMergeContext ctx3; + ctx3.row_cursor.reset(row3); + + queue.push(&ctx1); + queue.push(&ctx2); + queue.push(&ctx3); + + // should be: + // row1, row3, row2 + AlphaMergeContext* top1 = queue.top(); + ASSERT_EQ(top1, &ctx1); + queue.pop(); + AlphaMergeContext* top2 = queue.top(); + ASSERT_EQ(top2, &ctx3); + queue.pop(); + AlphaMergeContext* top3 = queue.top(); + ASSERT_EQ(top3, &ctx2); + queue.pop(); + ASSERT_TRUE(queue.empty()); +} + } // namespace doris int main(int argc, char **argv) { diff --git a/run-ut.sh b/run-ut.sh index 03432a6..f079703 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -253,7 +253,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/serialize_test # ${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test ${DORIS_TEST_BINARY_DIR}/olap/options_test -# Running routine load test +# Running segment v2 test ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test ${DORIS_TEST_BINARY_DIR}/olap/tablet_mgr_test ${DORIS_TEST_BINARY_DIR}/olap/olap_meta_test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org