This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new fcd25b53bf [Optimize](Random distribution) Improve the performance of tablet sin… (#17389) fcd25b53bf is described below commit fcd25b53bf4d91608537a75a1717797e1deeb62b Author: lihangyu <15605149...@163.com> AuthorDate: Fri Mar 10 10:52:40 2023 +0800 [Optimize](Random distribution) Improve the performance of tablet sin… (#17389) The current distribution model for Doris is as follows: OlapTableSink seperate the original Block into serveral subblocks of each node(BE) by tablets distribution and distributes subblocks to storage engine of backends, then the storage engine will seperate the subblock into multiple tablets channel and each delta writer will handle partial of the block. This model causes blocks to be split according to tablets, and the splitting process can be a relatively heavy operation. After splitting, the blocks are distributed to different DeltaWriters (Memtables) through RPCs to TabletChannels. The distribution operation on TabletChannels is also a relatively heavy operation. If the distribution property of the table is RANDOM distribution, then we have the opportunity to distribute the blocks according to the complete block during distributio [...] This optimze could save 10% ~ 20% CPU cost of RANDOM distribution table load when enable load_to_single_tablet --- be/src/olap/delta_writer.cpp | 18 +++- be/src/olap/delta_writer.h | 5 +- be/src/olap/memtable.cpp | 11 +- be/src/olap/memtable.h | 3 +- be/src/runtime/tablets_channel.cpp | 38 +++++-- be/src/vec/sink/vtablet_sink.cpp | 119 +++++++++++++++------ be/src/vec/sink/vtablet_sink.h | 13 ++- gensrc/proto/internal_service.proto | 1 + .../load_p0/stream_load/test_json_load.groovy | 17 +-- 9 files changed, 166 insertions(+), 59 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index d5f2446b52..1280925baf 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -17,6 +17,7 @@ #include "olap/delta_writer.h" +#include "common/status.h" #include "exec/tablet_info.h" #include "olap/data_dir.h" #include "olap/memtable.h" @@ -157,8 +158,13 @@ Status DeltaWriter::init() { return Status::OK(); } -Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs) { - if (UNLIKELY(row_idxs.empty())) { +Status DeltaWriter::append(const vectorized::Block* block) { + return write(block, {}, true); +} + +Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs, + bool is_append) { + if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); } std::lock_guard<std::mutex> l(_lock); @@ -176,8 +182,12 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> return Status::Error<ALREADY_CLOSED>(); } - _total_received_rows += row_idxs.size(); - _mem_table->insert(block, row_idxs); + if (is_append) { + _total_received_rows += block->rows(); + } else { + _total_received_rows += row_idxs.size(); + } + _mem_table->insert(block, row_idxs, is_append); if (UNLIKELY(_mem_table->need_agg())) { _mem_table->shrink_memtable_by_agg(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 82dd424564..0b133e6ac9 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -60,7 +60,10 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector<int>& row_idxs); + Status write(const vectorized::Block* block, const std::vector<int>& row_idxs, + bool is_append = false); + + Status append(const vectorized::Block* block); // flush the last memtable to flush queue, must call it before close_wait() Status close(); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 416107eb81..f207c2a270 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -147,7 +147,8 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, *_pblock, -1); } -void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { +void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs, + bool is_append) { SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); vectorized::Block target_block = *input_block; if (!_tablet_schema->is_dynamic_schema()) { @@ -176,7 +177,13 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in auto num_rows = row_idxs.size(); size_t cursor_in_mutableblock = _input_mutable_block.rows(); - _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); + if (is_append) { + // Append the block, call insert range from + _input_mutable_block.add_rows(&target_block, 0, target_block.rows()); + num_rows = target_block.rows(); + } else { + _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); + } size_t input_size = target_block.allocated_bytes() * num_rows / target_block.rows(); _mem_usage += input_size; _insert_mem_tracker->consume(input_size); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 62f211e12a..ed2e672e5c 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -51,7 +51,8 @@ public: return _insert_mem_tracker->consumption() + _flush_mem_tracker->consumption(); } // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, const std::vector<int>& row_idxs); + void insert(const vectorized::Block* block, const std::vector<int>& row_idxs, + bool is_append = false); void shrink_memtable_by_agg(); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index af423d0269..57f795f912 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -309,6 +309,9 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs; for (int i = 0; i < request.tablet_ids_size(); ++i) { + if (request.is_single_tablet_block()) { + break; + } int64_t tablet_id = request.tablet_ids(i); if (_is_broken_tablet(tablet_id)) { // skip broken tablets @@ -326,29 +329,42 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, auto get_send_data = [&]() { return vectorized::Block(request.block()); }; auto send_data = get_send_data(); - google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = - response->mutable_tablet_errors(); - for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { - auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first); + + auto write_tablet_data = [&](uint32_t tablet_id, + std::function<Status(DeltaWriter * writer)> write_func) { + google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors = + response->mutable_tablet_errors(); + auto tablet_writer_it = _tablet_writers.find(tablet_id); if (tablet_writer_it == _tablet_writers.end()) { - return Status::InternalError("unknown tablet to append data, tablet={}", - tablet_to_rowidxs_it.first); + return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id); } - - Status st = tablet_writer_it->second->write(&send_data, tablet_to_rowidxs_it.second); + Status st = write_func(tablet_writer_it->second); if (!st.ok()) { auto err_msg = fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}", - tablet_to_rowidxs_it.first, _txn_id, st.to_string()); + tablet_id, _txn_id, st.to_string()); LOG(WARNING) << err_msg; PTabletError* error = tablet_errors->Add(); - error->set_tablet_id(tablet_to_rowidxs_it.first); + error->set_tablet_id(tablet_id); error->set_msg(err_msg); tablet_writer_it->second->cancel_with_status(st); - _add_broken_tablet(tablet_to_rowidxs_it.first); + _add_broken_tablet(tablet_id); // continue write to other tablet. // the error will return back to sender. } + return Status::OK(); + }; + + if (request.is_single_tablet_block()) { + RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), [&](DeltaWriter* writer) { + return writer->append(&send_data); + })); + } else { + for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { + RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](DeltaWriter* writer) { + return writer->write(&send_data, tablet_to_rowidxs_it.second); + })); + } } { diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index ac58b4f3e7..59c2a445bd 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -22,6 +22,8 @@ #include <string> #include <unordered_map> +#include "common/logging.h" +#include "common/status.h" #include "exec/tablet_info.h" #include "olap/hll.h" #include "runtime/exec_env.h" @@ -36,9 +38,12 @@ #include "util/threadpool.h" #include "util/time.h" #include "util/uid_util.h" +#include "vec/columns/column.h" #include "vec/columns/column_array.h" #include "vec/columns/column_struct.h" +#include "vec/columns/columns_number.h" #include "vec/core/block.h" +#include "vec/core/types.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -406,8 +411,12 @@ Status VNodeChannel::open_wait() { Status VNodeChannel::add_block(vectorized::Block* block, const std::pair<std::unique_ptr<vectorized::IColumn::Selector>, - std::vector<int64_t>>& payload) { + std::vector<int64_t>>& payload, + bool is_append) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + if (payload.second.empty()) { + return Status::OK(); + } // If add_block() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { @@ -439,12 +448,30 @@ Status VNodeChannel::add_block(vectorized::Block* block, // this will align _cur_mutable_block with block and auto extends columns _cur_mutable_block->set_block_type(vectorized::BlockType::DYNAMIC); } - block->append_block_by_selector(_cur_mutable_block.get(), *(payload.first)); - for (auto tablet_id : payload.second) { - _cur_add_block_request.add_tablet_ids(tablet_id); + + if (is_append) { + // Do not split the data of the block by tablets but append it to a single delta writer. + // This is a faster way to send block than append_block_by_selector + // TODO: we could write to local delta writer if single_replica_load is true + VLOG_DEBUG << "send whole block by append block"; + std::vector<int64_t> tablets(block->rows(), payload.second[0]); + vectorized::MutableColumns& columns = _cur_mutable_block->mutable_columns(); + columns.clear(); + columns.reserve(block->columns()); + // Hold the reference of block columns to avoid copying + for (auto column : block->get_columns()) { + columns.push_back(column->assume_mutable()); + } + *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), tablets.end()}; + _cur_add_block_request.set_is_single_tablet_block(true); + } else { + block->append_block_by_selector(_cur_mutable_block.get(), *(payload.first)); + for (auto tablet_id : payload.second) { + _cur_add_block_request.add_tablet_ids(tablet_id); + } } - if (_cur_mutable_block->rows() >= _batch_size || + if (is_append || _cur_mutable_block->rows() >= _batch_size || _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); @@ -1011,6 +1038,31 @@ Status VOlapTableSink::find_tablet(RuntimeState* state, vectorized::Block* block return status; } +void VOlapTableSink::_generate_row_distribution_payload( + ChannelDistributionPayload& channel_to_payload, const VOlapTablePartition* partition, + uint32_t tablet_index, int row_idx, size_t row_cnt) { + // Generate channel payload for sinking data to differenct node channel + for (int j = 0; j < partition->indexes.size(); ++j) { + auto tid = partition->indexes[j].tablets[tablet_index]; + auto it = _channels[j]->_channels_by_tablet.find(tid); + DCHECK(it != _channels[j]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_index; + for (const auto& channel : it->second) { + if (channel_to_payload[j].count(channel.get()) < 1) { + channel_to_payload[j].insert( + {channel.get(), std::pair<std::unique_ptr<vectorized::IColumn::Selector>, + std::vector<int64_t>> { + std::unique_ptr<vectorized::IColumn::Selector>( + new vectorized::IColumn::Selector()), + std::vector<int64_t>()}}); + } + channel_to_payload[j][channel.get()].first->push_back(row_idx); + channel_to_payload[j][channel.get()].second.push_back(tid); + } + _number_output_rows += row_cnt; + } +} + Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, "VOlapTableSink::send"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -1021,7 +1073,6 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, if (UNLIKELY(rows == 0)) { return status; } - SCOPED_TIMER(_profile->total_time_counter()); _number_input_rows += rows; // update incrementally so that FE can get the progress. @@ -1061,52 +1112,58 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, SCOPED_RAW_TIMER(&_send_data_ns); // This is just for passing compilation. bool stop_processing = false; - if (findTabletMode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { - _partition_to_tablet_map.clear(); - } - std::vector<std::unordered_map< VNodeChannel*, std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>>> channel_to_payload; channel_to_payload.resize(_channels.size()); + if (findTabletMode == FIND_TABLET_EVERY_BATCH) { + // Recaculate is needed + _partition_to_tablet_map.clear(); + } for (int i = 0; i < num_rows; ++i) { - if (filtered_rows > 0 && _filter_bitmap.Get(i)) { + if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) { continue; } const VOlapTablePartition* partition = nullptr; - uint32_t tablet_index = 0; bool is_continue = false; + uint32_t tablet_index = 0; RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing, is_continue)); if (is_continue) { continue; } - for (int j = 0; j < partition->indexes.size(); ++j) { - auto tid = partition->indexes[j].tablets[tablet_index]; - auto it = _channels[j]->_channels_by_tablet.find(tid); - DCHECK(it != _channels[j]->_channels_by_tablet.end()) - << "unknown tablet, tablet_id=" << tablet_index; - for (const auto& channel : it->second) { - if (channel_to_payload[j].count(channel.get()) < 1) { - channel_to_payload[j].insert( - {channel.get(), - std::pair<std::unique_ptr<vectorized::IColumn::Selector>, - std::vector<int64_t>> { - std::unique_ptr<vectorized::IColumn::Selector>( - new vectorized::IColumn::Selector()), - std::vector<int64_t>()}}); - } - channel_to_payload[j][channel.get()].first->push_back(i); - channel_to_payload[j][channel.get()].second.push_back(tid); + // each row + _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, 1); + } + // Random distribution and the block belongs to a single tablet, we could optimize to append the whole + // block into node channel. + bool load_block_to_single_tablet = + !_schema->is_dynamic_schema() && _partition_to_tablet_map.size() == 1; + if (load_block_to_single_tablet) { + // clear and release the references of columns + input_block->clear(); + // Filter block + if (filtered_rows > 0) { + auto filter = vectorized::ColumnUInt8::create(block.rows(), 0); + vectorized::UInt8* filter_data = + static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data().data(); + vectorized::IColumn::Filter& filter_col = + static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data(); + for (size_t i = 0; i < filter_col.size(); ++i) { + filter_data[i] = !_filter_bitmap.Get(i); } - _number_output_rows++; + vectorized::Block::filter_block_internal(&block, filter_col, block.columns()); } } + // Add block to node channel for (size_t i = 0; i < _channels.size(); i++) { for (const auto& entry : channel_to_payload[i]) { // if this node channel is already failed, this add_row will be skipped - auto st = entry.first->add_block(&block, entry.second); + auto st = entry.first->add_block( + &block, entry.second, + // if it is load single tablet, then append this whole block + load_block_to_single_tablet); if (!st.ok()) { _channels[i]->mark_as_failed(entry.first->node_id(), entry.first->host(), st.to_string()); diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 5200c29f37..c92f2619ff 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -39,6 +39,7 @@ #include "util/spinlock.h" #include "util/thread.h" #include "vec/columns/column.h" +#include "vec/columns/columns_number.h" #include "vec/core/block.h" namespace doris { @@ -178,7 +179,8 @@ public: Status add_block(vectorized::Block* block, const std::pair<std::unique_ptr<vectorized::IColumn::Selector>, - std::vector<int64_t>>& payload); + std::vector<int64_t>>& payload, + bool is_append = false); int try_send_and_fetch_status(RuntimeState* state, std::unique_ptr<ThreadPoolToken>& thread_pool_token); @@ -421,6 +423,15 @@ private: friend class VNodeChannel; friend class IndexChannel; + using ChannelDistributionPayload = std::vector<std::unordered_map< + VNodeChannel*, + std::pair<std::unique_ptr<vectorized::IColumn::Selector>, std::vector<int64_t>>>>; + + // payload for each row + void _generate_row_distribution_payload(ChannelDistributionPayload& payload, + const VOlapTablePartition* partition, + uint32_t tablet_index, int row_idx, size_t row_cnt); + // make input data valid for OLAP table // return number of invalid/filtered rows. // invalid row number is set in Bitmap diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 3cff8fd733..964aab4139 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -136,6 +136,7 @@ message PTabletWriterAddBlockRequest { optional bool is_high_priority = 11 [default = false]; optional bool write_single_replica = 12 [default = false]; map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13; + optional bool is_single_tablet_block = 14 [default = false]; }; message PSlaveTabletNodes { diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_json_load.groovy index f874eb2b87..f6326129dc 100644 --- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy @@ -26,7 +26,7 @@ suite("test_json_load", "p0") { id INT DEFAULT '10', city VARCHAR(32) DEFAULT '', code BIGINT SUM DEFAULT '0') - DISTRIBUTED BY HASH(id) BUCKETS 10 + DISTRIBUTED BY RANDOM BUCKETS 10 PROPERTIES("replication_num" = "1"); """ @@ -48,7 +48,7 @@ suite("test_json_load", "p0") { CREATE TABLE IF NOT EXISTS ${testTable} ( id INT DEFAULT '10', code BIGINT SUM DEFAULT '0') - DISTRIBUTED BY HASH(id) BUCKETS 10 + DISTRIBUTED BY RANDOM BUCKETS 10 PROPERTIES("replication_num" = "1"); """ @@ -72,7 +72,7 @@ suite("test_json_load", "p0") { id INT DEFAULT '10', city VARCHAR(32) NOT NULL, code BIGINT SUM DEFAULT '0') - DISTRIBUTED BY HASH(id) BUCKETS 10 + DISTRIBUTED BY RANDOM BUCKETS 10 PROPERTIES("replication_num" = "1"); """ @@ -116,7 +116,7 @@ suite("test_json_load", "p0") { def load_json_data = {label, strip_flag, read_flag, format_flag, exprs, json_paths, json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false, - expected_succ_rows = -1 -> + expected_succ_rows = -1, load_to_single_tablet = 'true' -> // load the json data streamLoad { @@ -132,6 +132,7 @@ suite("test_json_load", "p0") { set 'json_root', json_root set 'where', where_expr set 'fuzzy_parse', fuzzy_flag + set 'load_to_single_tablet', load_to_single_tablet file file_name // import json file time 10000 // limit inflight 10s if (expected_succ_rows >= 0) { @@ -496,13 +497,13 @@ suite("test_json_load", "p0") { test_invalid_json_array_table.call(testTable) load_json_data.call('test_json_load_case17', 'true', '', 'json', '', '', - '', '', '', 'invalid_json_array.json', false, 0) + '', '', '', 'invalid_json_array.json', false, 0, 'false') load_json_data.call('test_json_load_case17_1', 'true', '', 'json', '', '', - '$.item', '', '', 'invalid_json_array1.json', false, 0) + '$.item', '', '', 'invalid_json_array1.json', false, 0, 'false') load_json_data.call('test_json_load_case17_2', 'true', '', 'json', '', '', - '$.item', '', '', 'invalid_json_array2.json', false, 0) + '$.item', '', '', 'invalid_json_array2.json', false, 0, 'false') load_json_data.call('test_json_load_case17_3', 'true', '', 'json', '', '', - '$.item', '', '', 'invalid_json_array3.json', false, 0) + '$.item', '', '', 'invalid_json_array3.json', false, 0, 'false') sql "sync" qt_select17 "select * from ${testTable}" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org