github-actions[bot] commented on code in PR #19251: URL: https://github.com/apache/doris/pull/19251#discussion_r1194869158
########## be/src/olap/push_handler.cpp: ########## @@ -372,7 +399,151 @@ Status PushBrokerReader::next(vectorized::Block* block) { if (!_ready || block == nullptr) { return Status::Error<INVALID_ARGUMENT>(); } - _scanner->get_next(block, &_eof); + if (_cur_reader == nullptr || _cur_reader_eof) { + RETURN_IF_ERROR(_get_next_reader()); + if (_eof) { + return Status::OK(); + } + } + RETURN_IF_ERROR(_init_src_block()); + size_t read_rows = 0; + RETURN_IF_ERROR(_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); + if (read_rows > 0) { + RETURN_IF_ERROR(_cast_to_input_block()); + RETURN_IF_ERROR(_convert_to_output_block(block)); + } + return Status::OK(); +} + +Status PushBrokerReader::close() { + _ready = false; + for (auto ctx : _dest_vexpr_ctx) { + if (ctx != nullptr) { + ctx->close(_runtime_state.get()); + } + } + + if (_push_down_expr) { + _push_down_expr->close(_runtime_state.get()); + } + + for (auto& [k, v] : _slot_id_to_filter_conjuncts) { + for (auto& ctx : v) { + if (ctx != nullptr) { + ctx->close(_runtime_state.get()); + } + } + } + + for (auto* ctx : _not_single_slot_filter_conjuncts) { + if (ctx != nullptr) { + ctx->close(_runtime_state.get()); + } + } + return Status::OK(); +} + +Status PushBrokerReader::_init_src_block() { + _src_block.clear(); + int idx = 0; + for (auto& slot : _src_slot_descs) { + vectorized::DataTypePtr data_type; + auto it = _name_to_col_type.find(slot->col_name()); + if (it == _name_to_col_type.end() || _is_dynamic_schema) { + // not exist in file, using type from _input_tuple_desc + data_type = vectorized::DataTypeFactory::instance().create_data_type( + slot->type(), slot->is_nullable()); + } else { + data_type = vectorized::DataTypeFactory::instance().create_data_type(it->second, true); + } + if (data_type == nullptr) { + return Status::NotSupported("Not support data type {} for column {}", + it == _name_to_col_type.end() ? slot->type().debug_string() + : it->second.debug_string(), + slot->col_name()); + } + vectorized::MutableColumnPtr data_column = data_type->create_column(); + _src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type, + slot->col_name())); + _src_block_name_to_idx.emplace(slot->col_name(), idx++); + } + _src_block_ptr = &_src_block; + return Status::OK(); +} + +Status PushBrokerReader::_cast_to_input_block() { Review Comment: warning: method '_cast_to_input_block' can be made const [readability-make-member-function-const] ```suggestion Status PushBrokerReader::_cast_to_input_block() const { ``` be/src/olap/push_handler.h:105: ```diff - Status _cast_to_input_block(); + Status _cast_to_input_block() const; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org