yiguolei commented on code in PR #48368: URL: https://github.com/apache/doris/pull/48368#discussion_r1976233533
########## be/src/vec/sink/vmysql_result_writer.cpp: ########## @@ -70,27 +72,64 @@ #include "vec/exprs/vexpr_context.h" #include "vec/runtime/vdatetime_value.h" -namespace doris { +namespace doris::vectorized { #include "common/compile_check_begin.h" -namespace vectorized { + +void GetResultBatchCtx::on_failure(const Status& status) { + DCHECK(!status.ok()) << "status is ok, errmsg=" << status; + status.to_protobuf(_result->mutable_status()); + _done->Run(); +} + +void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) { + Status status; + status.to_protobuf(_result->mutable_status()); + PQueryStatistics* statistics = _result->mutable_query_statistics(); + statistics->set_returned_rows(returned_rows); + _result->set_packet_seq(packet_seq); + _result->set_eos(true); + _done->Run(); +} + +Status GetResultBatchCtx::on_data(const std::shared_ptr<TFetchDataResult>& t_result, + int64_t packet_seq, ResultBlockBufferBase* buffer) { + Status st = Status::OK(); + if (t_result != nullptr) { + uint8_t* buf = nullptr; + uint32_t len = 0; + ThriftSerializer ser(false, 4096); + RETURN_IF_ERROR(ser.serialize(&t_result->result_batch, &len, &buf)); + _result->set_row_batch(std::string((const char*)buf, len)); + } else { + _result->clear_row_batch(); + _result->set_empty_batch(true); + } + _result->set_packet_seq(packet_seq); + _result->set_eos(false); + + /// The size limit of proto buffer message is 2G + if (_result->ByteSizeLong() > _max_msg_size) { + st = Status::InternalError("Message size exceeds 2GB: {}", _result->ByteSizeLong()); + _result->clear_row_batch(); + _result->set_empty_batch(true); + } + st.to_protobuf(_result->mutable_status()); + _done->Run(); + return Status::OK(); +} template <bool is_binary_format> -VMysqlResultWriter<is_binary_format>::VMysqlResultWriter(BufferControlBlock* sinker, - const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile) +VMysqlResultWriter<is_binary_format>::VMysqlResultWriter( + std::shared_ptr<ResultBlockBufferBase> sinker, const VExprContextSPtrs& output_vexpr_ctxs, + RuntimeProfile* parent_profile) : ResultWriter(), - _sinker(sinker), + _sinker(std::dynamic_pointer_cast<NormalResultBlockBuffer>(sinker)), Review Comment: 这里使用assert cast,并且把检查开关打开 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org