This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2d5cd744dfa [Refactor](Sink) Remove is_append mode in table sink 
(#34549)
2d5cd744dfa is described below

commit 2d5cd744dfa67bb0f69616556c96d79f27dd1219
Author: lihangyu <15605149...@163.com>
AuthorDate: Wed May 8 20:31:57 2024 +0800

    [Refactor](Sink) Remove is_append mode in table sink (#34549)
---
 be/src/olap/delta_writer.cpp       | 17 +++----------
 be/src/olap/delta_writer.h         |  5 +---
 be/src/olap/memtable.cpp           | 11 ++------
 be/src/olap/memtable.h             |  3 +--
 be/src/runtime/tablets_channel.cpp | 18 +++-----------
 be/src/vec/sink/vtablet_sink.cpp   | 51 +++++---------------------------------
 be/src/vec/sink/vtablet_sink.h     |  2 +-
 7 files changed, 19 insertions(+), 88 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 594bc7b630c..f33040de2cc 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -237,13 +237,8 @@ Status DeltaWriter::init() {
     return Status::OK();
 }
 
-Status DeltaWriter::append(const vectorized::Block* block) {
-    return write(block, {}, true);
-}
-
-Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs,
-                          bool is_append) {
-    if (UNLIKELY(row_idxs.empty() && !is_append)) {
+Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<int>& row_idxs) {
+    if (UNLIKELY(row_idxs.empty())) {
         return Status::OK();
     }
     _lock_watch.start();
@@ -263,12 +258,8 @@ Status DeltaWriter::write(const vectorized::Block* block, 
const std::vector<int>
                 _req.load_id.hi(), _req.load_id.lo(), _req.txn_id);
     }
 
-    if (is_append) {
-        _total_received_rows += block->rows();
-    } else {
-        _total_received_rows += row_idxs.size();
-    }
-    _mem_table->insert(block, row_idxs, is_append);
+    _total_received_rows += row_idxs.size();
+    _mem_table->insert(block, row_idxs);
 
     if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
         _mem_table->shrink_memtable_by_agg();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index ede5ca1f03b..497881d585f 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -85,10 +85,7 @@ public:
 
     Status init();
 
-    Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
-                 bool is_append = false);
-
-    Status append(const vectorized::Block* block);
+    Status write(const vectorized::Block* block, const std::vector<int>& 
row_idxs);
 
     // flush the last memtable to flush queue, must call it before 
build_rowset()
     Status close();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 5d272c1a754..8fe58c686be 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -185,8 +185,7 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<int>& row_idxs,
-                      bool is_append) {
+void MemTable::insert(const vectorized::Block* input_block, const 
std::vector<int>& row_idxs) {
     SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
     vectorized::Block target_block = *input_block;
     if (!_tablet_schema->is_dynamic_schema()) {
@@ -222,13 +221,7 @@ void MemTable::insert(const vectorized::Block* 
input_block, const std::vector<in
 
     auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    if (is_append) {
-        // Append the block, call insert range from
-        _input_mutable_block.add_rows(&target_block, 0, target_block.rows());
-        num_rows = target_block.rows();
-    } else {
-        _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
-    }
+    _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
     size_t input_size = target_block.allocated_bytes() * num_rows / 
target_block.rows();
     _mem_usage += input_size;
     _insert_mem_tracker->consume(input_size);
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 7d74b8ce43f..016f11f61ac 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -188,8 +188,7 @@ public:
                _flush_mem_tracker->consumption();
     }
     // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, const std::vector<int>& 
row_idxs,
-                bool is_append = false);
+    void insert(const vectorized::Block* block, const std::vector<int>& 
row_idxs);
 
     void shrink_memtable_by_agg();
 
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 17c84956f69..5293200d532 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -448,9 +448,6 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
 
     std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index 
*/> tablet_to_rowidxs;
     for (int i = 0; i < request.tablet_ids_size(); ++i) {
-        if (request.is_single_tablet_block()) {
-            break;
-        }
         int64_t tablet_id = request.tablet_ids(i);
         if (_is_broken_tablet(tablet_id)) {
             // skip broken tablets
@@ -496,18 +493,11 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
         return Status::OK();
     };
 
-    if (request.is_single_tablet_block()) {
-        SCOPED_TIMER(_write_block_timer);
-        RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), 
[&](DeltaWriter* writer) {
-            return writer->append(&send_data);
+    SCOPED_TIMER(_write_block_timer);
+    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
+        RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, 
[&](DeltaWriter* writer) {
+            return writer->write(&send_data, tablet_to_rowidxs_it.second);
         }));
-    } else {
-        SCOPED_TIMER(_write_block_timer);
-        for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
-            RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, 
[&](DeltaWriter* writer) {
-                return writer->write(&send_data, tablet_to_rowidxs_it.second);
-            }));
-        }
     }
 
     {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 4fade8c7cdd..862c8dae7e2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -525,7 +525,7 @@ Status VNodeChannel::open_wait() {
     return status;
 }
 
-Status VNodeChannel::add_block(vectorized::Block* block, const Payload* 
payload, bool is_append) {
+Status VNodeChannel::add_block(vectorized::Block* block, const Payload* 
payload) {
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     if (payload->second.empty()) {
         return Status::OK();
@@ -604,50 +604,12 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
     }
 
     SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
-    if (is_append) {
-        if (_cur_mutable_block && !_cur_mutable_block->empty()) {
-            // When is-append is true, the previous block may not have been 
sent out yet.
-            // (e.x. The previous block is not load to single tablet, and its 
row num was
-            // 4064, which is smaller than the send batch size 8192).
-            // If we clear the previous block directly here, it will cause 
data loss.
-            {
-                SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
-                std::lock_guard<std::mutex> l(_pending_batches_lock);
-                _pending_batches_bytes += 
_cur_mutable_block->allocated_bytes();
-                _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
-                _pending_batches_num++;
-                VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" 
<< this
-                           << " pending_batches_bytes:" << 
_pending_batches_bytes
-                           << " jobid:" << 
std::to_string(_state->load_job_id())
-                           << " loadinfo:" << _load_info;
-            }
-            _cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
-            _cur_add_block_request.clear_tablet_ids();
-        }
-        // Do not split the data of the block by tablets but append it to a 
single delta writer.
-        // This is a faster way to send block than append_block_by_selector
-        // TODO: we could write to local delta writer if single_replica_load 
is true
-        VLOG_DEBUG << "send whole block by append block";
-        std::vector<int64_t> tablets(block->rows(), payload->second[0]);
-        vectorized::MutableColumns& columns = 
_cur_mutable_block->mutable_columns();
-        columns.clear();
-        columns.reserve(block->columns());
-        // Hold the reference of block columns to avoid copying
-        for (auto column : block->get_columns()) {
-            columns.push_back(column->assume_mutable());
-        }
-        *_cur_add_block_request.mutable_tablet_ids() = {tablets.begin(), 
tablets.end()};
-        _cur_add_block_request.set_is_single_tablet_block(true);
-    } else {
-        block->append_block_by_selector(_cur_mutable_block.get(), 
*(payload->first));
-        for (auto tablet_id : payload->second) {
-            _cur_add_block_request.add_tablet_ids(tablet_id);
-        }
-        // need to reset to false avoid load data to incorrect tablet.
-        _cur_add_block_request.set_is_single_tablet_block(false);
+    block->append_block_by_selector(_cur_mutable_block.get(), 
*(payload->first));
+    for (auto tablet_id : payload->second) {
+        _cur_add_block_request.add_tablet_ids(tablet_id);
     }
 
-    if (is_append || _cur_mutable_block->rows() >= _batch_size ||
+    if (_cur_mutable_block->rows() >= _batch_size ||
         _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
         {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
@@ -1454,9 +1416,8 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         for (const auto& entry : channel_to_payload[i]) {
             // if this node channel is already failed, this add_row will be 
skipped
             auto st = entry.first->add_block(
-                    &block, &entry.second,
                     // if it is load single tablet, then append this whole 
block
-                    load_block_to_single_tablet);
+                    &block, &entry.second);
             if (!st.ok()) {
                 _channels[i]->mark_as_failed(entry.first, st.to_string());
             }
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 6b7e0187c49..abdf44a7e43 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -229,7 +229,7 @@ public:
 
     Status open_wait();
 
-    Status add_block(vectorized::Block* block, const Payload* payload, bool 
is_append = false);
+    Status add_block(vectorized::Block* block, const Payload* payload);
 
     int try_send_and_fetch_status(RuntimeState* state,
                                   std::unique_ptr<ThreadPoolToken>& 
thread_pool_token);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to