HappenLee commented on code in PR #9314:
URL: https://github.com/apache/incubator-doris/pull/9314#discussion_r862587595


##########
be/src/exec/broker_scanner.cpp:
##########
@@ -560,8 +570,9 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) 
{
         str_slot->len = value.size;
     }
 
+    const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
     if (range.__isset.num_of_columns_from_file) {
-        fill_slots_of_columns_from_path(range.num_of_columns_from_file, 
columns_from_path);
+        fill_slots_of_columns_from_path(range.num_of_columns_from_file, 
range.columns_from_path);
     }
 
     _success = true;

Review Comment:
   in function `line_split_to_values` have set the `_success`,not need set here.



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -121,73 +121,94 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
     bool scanner_eof = false;
 
     const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
 
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+        if (block->rows() == 0) {
+            continue;
+        }
+        auto old_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, 
block.get(),
+                                                    
_tuple_desc->slots().size()));
+        counter->num_rows_unselected += old_rows - block->rows();
+        if (block->rows() == 0) {
+            continue;
+        }
 
-            RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
-            if (scanner_eof) {
-                break;
-            }
+        // merge block
+        if (_mutable_block.get() == nullptr) {
+            _mutable_block.reset(new MutableBlock(block->clone_empty()));
         }
 
-        if (!columns[0]->empty()) {
-            auto n_columns = 0;
-            for (const auto slot_desc : _tuple_desc->slots()) {
-                
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                    
slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
+        int row_wait_add = block->rows();
+        int begin = 0;
+        while (row_wait_add > 0) {
+            int row_add = 0;
+            int max_add = batch_size - _mutable_block->rows();
+            if (row_wait_add >= max_add) {
+                row_add = max_add;
+            } else {
+                row_add = row_wait_add;
             }
 
-            auto old_rows = block->rows();
-
-            RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, 
block.get(),
-                                                       
_tuple_desc->slots().size()));
+            _mutable_block->add_rows(block.get(), begin, row_add);

Review Comment:
   here must have a copy here. should opt to skip the copy operation. if 
`_mutable_block.size() + block.size() < batch_size` to do merge. else directly 
return the block, do not copy here



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -121,73 +121,94 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
     bool scanner_eof = false;
 
     const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
 
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
+        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
+        if (block->rows() == 0) {
+            continue;
+        }
+        auto old_rows = block->rows();
+        RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, 
block.get(),
+                                                    
_tuple_desc->slots().size()));
+        counter->num_rows_unselected += old_rows - block->rows();
+        if (block->rows() == 0) {
+            continue;
+        }
 
-            RETURN_IF_ERROR(scanner->get_next(columns, &scanner_eof));
-            if (scanner_eof) {
-                break;
-            }
+        // merge block
+        if (_mutable_block.get() == nullptr) {
+            _mutable_block.reset(new MutableBlock(block->clone_empty()));
         }
 
-        if (!columns[0]->empty()) {
-            auto n_columns = 0;
-            for (const auto slot_desc : _tuple_desc->slots()) {
-                
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                    
slot_desc->get_data_type_ptr(),
-                                                    slot_desc->col_name()));
+        int row_wait_add = block->rows();
+        int begin = 0;
+        while (row_wait_add > 0) {
+            int row_add = 0;
+            int max_add = batch_size - _mutable_block->rows();
+            if (row_wait_add >= max_add) {
+                row_add = max_add;
+            } else {
+                row_add = row_wait_add;
             }
 
-            auto old_rows = block->rows();
-
-            RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, 
block.get(),
-                                                       
_tuple_desc->slots().size()));
+            _mutable_block->add_rows(block.get(), begin, row_add);
+            row_wait_add -= row_add;
+            begin += row_add;
+            if (_mutable_block->rows() >= batch_size) {
+                RETURN_IF_ERROR(push_block_queue());
+            }
+        }
+    }
 
-            counter->num_rows_unselected += old_rows - block->rows();
+    RETURN_IF_ERROR(push_block_queue());
+    return Status::OK();
+}
 
-            std::unique_lock<std::mutex> l(_batch_queue_lock);
-            while (_process_status.ok() && !_scan_finished.load() &&
-                   !_runtime_state->is_cancelled() &&
-                   // stop pushing more batch if
-                   // 1. too many batches in queue, or
-                   // 2. at least one batch in queue and memory exceed limit.
-                   (_block_queue.size() >= _max_buffered_batches ||
-                    (mem_tracker()->any_limit_exceeded() && 
!_block_queue.empty()))) {
-                _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
-            }
-            // Process already set failed, so we just return OK
-            if (!_process_status.ok()) {
-                return Status::OK();
-            }
-            // Scan already finished, just return
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
-            // Runtime state is canceled, just return cancel
-            if (_runtime_state->is_cancelled()) {
-                return Status::Cancelled("Cancelled");
-            }
-            // Queue size Must be smaller than _max_buffered_batches
-            _block_queue.push_back(block);
+Status VBrokerScanNode::push_block_queue() {
+    if (_mutable_block.get() == nullptr || _mutable_block->rows() == 0) {
+        return Status::OK();
+    }
 
-            // Notify reader to
-            _queue_reader_cond.notify_one();
-        }
+    auto output_block = _mutable_block->to_block();
+    std::shared_ptr<vectorized::Block> block_ptr(new vectorized::Block());
+    block_ptr->swap(std::move(output_block));

Review Comment:
   block_ptr->swap(_mutable_block->to_block());



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -121,73 +121,94 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
     bool scanner_eof = false;
 
     const int batch_size = _runtime_state->batch_size();
-    size_t slot_num = _tuple_desc->slots().size();
 
     while (!scanner_eof) {
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        std::vector<vectorized::MutableColumnPtr> columns(slot_num);
-        for (int i = 0; i < slot_num; i++) {
-            columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column();
+        RETURN_IF_CANCELLED(_runtime_state);
+        // If we have finished all works
+        if (_scan_finished.load() || !_process_status.ok()) {
+            return Status::OK();
         }
 
-        while (columns[0]->size() < batch_size && !scanner_eof) {
-            RETURN_IF_CANCELLED(_runtime_state);
-            // If we have finished all works
-            if (_scan_finished.load()) {
-                return Status::OK();
-            }
+        std::shared_ptr<vectorized::Block> block(new vectorized::Block());

Review Comment:
   why here is shared_ptr ? not the unique_ptr?



##########
be/src/exec/base_scanner.cpp:
##########
@@ -133,6 +140,15 @@ Status BaseScanner::init_expr_ctxes() {
                << ", name=" << slot_desc->col_name();
             return Status::InternalError(ss.str());
         }
+
+        if (_state->enable_vectorized_exec()) {
+            vectorized::VExprContext* ctx = nullptr;
+            
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), 
it->second, &ctx));
+            RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), 
_mem_tracker));
+            RETURN_IF_ERROR(ctx->open(_state));
+            _dest_vexpr_ctx.emplace_back(ctx);
+        }

Review Comment:
   else {}



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to