github-actions[bot] commented on code in PR #19251:
URL: https://github.com/apache/doris/pull/19251#discussion_r1194869158


##########
be/src/olap/push_handler.cpp:
##########
@@ -372,7 +399,151 @@ Status PushBrokerReader::next(vectorized::Block* block) {
     if (!_ready || block == nullptr) {
         return Status::Error<INVALID_ARGUMENT>();
     }
-    _scanner->get_next(block, &_eof);
+    if (_cur_reader == nullptr || _cur_reader_eof) {
+        RETURN_IF_ERROR(_get_next_reader());
+        if (_eof) {
+            return Status::OK();
+        }
+    }
+    RETURN_IF_ERROR(_init_src_block());
+    size_t read_rows = 0;
+    RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &read_rows, 
&_cur_reader_eof));
+    if (read_rows > 0) {
+        RETURN_IF_ERROR(_cast_to_input_block());
+        RETURN_IF_ERROR(_convert_to_output_block(block));
+    }
+    return Status::OK();
+}
+
+Status PushBrokerReader::close() {
+    _ready = false;
+    for (auto ctx : _dest_vexpr_ctx) {
+        if (ctx != nullptr) {
+            ctx->close(_runtime_state.get());
+        }
+    }
+
+    if (_push_down_expr) {
+        _push_down_expr->close(_runtime_state.get());
+    }
+
+    for (auto& [k, v] : _slot_id_to_filter_conjuncts) {
+        for (auto& ctx : v) {
+            if (ctx != nullptr) {
+                ctx->close(_runtime_state.get());
+            }
+        }
+    }
+
+    for (auto* ctx : _not_single_slot_filter_conjuncts) {
+        if (ctx != nullptr) {
+            ctx->close(_runtime_state.get());
+        }
+    }
+    return Status::OK();
+}
+
+Status PushBrokerReader::_init_src_block() {
+    _src_block.clear();
+    int idx = 0;
+    for (auto& slot : _src_slot_descs) {
+        vectorized::DataTypePtr data_type;
+        auto it = _name_to_col_type.find(slot->col_name());
+        if (it == _name_to_col_type.end() || _is_dynamic_schema) {
+            // not exist in file, using type from _input_tuple_desc
+            data_type = 
vectorized::DataTypeFactory::instance().create_data_type(
+                    slot->type(), slot->is_nullable());
+        } else {
+            data_type = 
vectorized::DataTypeFactory::instance().create_data_type(it->second, true);
+        }
+        if (data_type == nullptr) {
+            return Status::NotSupported("Not support data type {} for column 
{}",
+                                        it == _name_to_col_type.end() ? 
slot->type().debug_string()
+                                                                      : 
it->second.debug_string(),
+                                        slot->col_name());
+        }
+        vectorized::MutableColumnPtr data_column = data_type->create_column();
+        
_src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), 
data_type,
+                                                            slot->col_name()));
+        _src_block_name_to_idx.emplace(slot->col_name(), idx++);
+    }
+    _src_block_ptr = &_src_block;
+    return Status::OK();
+}
+
+Status PushBrokerReader::_cast_to_input_block() {

Review Comment:
   warning: method '_cast_to_input_block' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   Status PushBrokerReader::_cast_to_input_block() const {
   ```
   
   be/src/olap/push_handler.h:105:
   ```diff
   -     Status _cast_to_input_block();
   +     Status _cast_to_input_block() const;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to