This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 2d5cd744dfa [Refactor](Sink) Remove is_append mode in table sink (#34549) 2d5cd744dfa is described below commit 2d5cd744dfa67bb0f69616556c96d79f27dd1219 Author: lihangyu <15605149...@163.com> AuthorDate: Wed May 8 20:31:57 2024 +0800 [Refactor](Sink) Remove is_append mode in table sink (#34549) --- be/src/olap/delta_writer.cpp | 17 +++---------- be/src/olap/delta_writer.h | 5 +--- be/src/olap/memtable.cpp | 11 ++------ be/src/olap/memtable.h | 3 +-- be/src/runtime/tablets_channel.cpp | 18 +++----------- be/src/vec/sink/vtablet_sink.cpp | 51 +++++--------------------------------- be/src/vec/sink/vtablet_sink.h | 2 +- 7 files changed, 19 insertions(+), 88 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 594bc7b630c..f33040de2cc 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -237,13 +237,8 @@ Status DeltaWriter::init() { return Status::OK(); } -Status DeltaWriter::append(const vectorized::Block* block) { - return write(block, {}, true); -} - -Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs, - bool is_append) { - if (UNLIKELY(row_idxs.empty() && !is_append)) { +Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs) { + if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } _lock_watch.start(); @@ -263,12 +258,8 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int> _req.load_id.hi(), _req.load_id.lo(), _req.txn_id); } - if (is_append) { - _total_received_rows += block->rows(); - } else { - _total_received_rows += row_idxs.size(); - } - _mem_table->insert(block, row_idxs, is_append); + _total_received_rows += row_idxs.size(); + _mem_table->insert(block, row_idxs); if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) { _mem_table->shrink_memtable_by_agg(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index ede5ca1f03b..497881d585f 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -85,10 +85,7 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector<int>& row_idxs, - bool is_append = false); - - Status append(const vectorized::Block* block); + Status write(const vectorized::Block* block, const std::vector<int>& row_idxs); // flush the last memtable to flush queue, must call it before build_rowset() Status close(); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 5d272c1a754..8fe58c686be 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -185,8 +185,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r *_pblock, -1); } -void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs, - bool is_append) { +void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); vectorized::Block target_block = *input_block; if (!_tablet_schema->is_dynamic_schema()) { @@ -222,13 +221,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in auto num_rows = row_idxs.size(); size_t cursor_in_mutableblock = _input_mutable_block.rows(); - if (is_append) { - // Append the block, call insert range from - _input_mutable_block.add_rows(&target_block, 0, target_block.rows()); - num_rows = target_block.rows(); - } else { - _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); - } + _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); size_t input_size = target_block.allocated_bytes() * num_rows / target_block.rows(); _mem_usage += input_size; _insert_mem_tracker->consume(input_size); diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 7d74b8ce43f..016f11f61ac 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -188,8 +188,7 @@ public: _flush_mem_tracker->consumption(); } // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, const std::vector<int>& row_idxs, - bool is_append = false); + void insert(const vectorized::Block* block, const std::vector<int>& row_idxs); void shrink_memtable_by_agg(); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 17c84956f69..5293200d532 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -448,9 +448,6 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index */> tablet_to_rowidxs; for (int i = 0; i < request.tablet_ids_size(); ++i) { - if (request.is_single_tablet_block()) { - break; - } int64_t tablet_id = request.tablet_ids(i); if (_is_broken_tablet(tablet_id)) { // skip broken tablets @@ -496,18 +493,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); }; - if (request.is_single_tablet_block()) { - SCOPED_TIMER(_write_block_timer); - RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), [&](DeltaWriter* writer) { - return writer->append(&send_data); + SCOPED_TIMER(_write_block_timer); + for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { + RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](DeltaWriter* writer) { + return writer->write(&send_data, tablet_to_rowidxs_it.second); })); - } else { - SCOPED_TIMER(_write_block_timer); - for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { - RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](DeltaWriter* writer) { - return writer->write(&send_data, tablet_to_rowidxs_it.second); - })); - } } { diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 4fade8c7cdd..862c8dae7e2 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -525,7 +525,7 @@ Status VNodeChannel::open_wait() { return status; } -Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, bool is_append) { +Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload) { SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); if (payload->second.empty()) { return Status::OK(); @@ -604,50 +604,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload, } SCOPED_RAW_TIMER(&_stat.append_node_channel_ns); - if (is_append) { - if (_cur_mutable_block && !_cur_mutable_block->empty()) { - // When is-append is true, the previous block may not have been sent out yet. - // (e.x. The previous block is not load to single tablet, and its row num was - // 4064, which is smaller than the send batch size 8192). - // If we clear the previous block directly here, it will cause data loss. - { - SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); - std::lock_guard<std::mutex> l(_pending_batches_lock); - _pending_batches_bytes += _cur_mutable_block->allocated_bytes(); - _pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request); - _pending_batches_num++; - VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << this - << " pending_batches_bytes:" << _pending_batches_bytes - << " jobid:" << std::to_string(_state->load_job_id()) - << " loadinfo:" << _load_info; - } - _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); - _cur_add_block_request.clear_tablet_ids(); - } - // Do not split the data of the block by tablets but append it to a single delta writer. - // This is a faster way to send block than append_block_by_selector - // TODO: we could write to local delta writer if single_replica_load is true - VLOG_DEBUG << "send whole block by append block"; - std::vector<int64_t> tablets(block->rows(), payload->second[0]); - vectorized::MutableColumns& columns = _cur_mutable_block->mutable_columns(); - columns.clear(); - columns.reserve(block->columns()); - // Hold the reference of block columns to avoid copying - for (auto column : block->get_columns()) { - columns.push_back(column->assume_mutable()); - } - *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), tablets.end()}; - _cur_add_block_request.set_is_single_tablet_block(true); - } else { - block->append_block_by_selector(_cur_mutable_block.get(), *(payload->first)); - for (auto tablet_id : payload->second) { - _cur_add_block_request.add_tablet_ids(tablet_id); - } - // need to reset to false avoid load data to incorrect tablet. - _cur_add_block_request.set_is_single_tablet_block(false); + block->append_block_by_selector(_cur_mutable_block.get(), *(payload->first)); + for (auto tablet_id : payload->second) { + _cur_add_block_request.add_tablet_ids(tablet_id); } - if (is_append || _cur_mutable_block->rows() >= _batch_size || + if (_cur_mutable_block->rows() >= _batch_size || _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); @@ -1454,9 +1416,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, for (const auto& entry : channel_to_payload[i]) { // if this node channel is already failed, this add_row will be skipped auto st = entry.first->add_block( - &block, &entry.second, // if it is load single tablet, then append this whole block - load_block_to_single_tablet); + &block, &entry.second); if (!st.ok()) { _channels[i]->mark_as_failed(entry.first, st.to_string()); } diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 6b7e0187c49..abdf44a7e43 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -229,7 +229,7 @@ public: Status open_wait(); - Status add_block(vectorized::Block* block, const Payload* payload, bool is_append = false); + Status add_block(vectorized::Block* block, const Payload* payload); int try_send_and_fetch_status(RuntimeState* state, std::unique_ptr<ThreadPoolToken>& thread_pool_token); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org