yiguolei commented on code in PR #48368:
URL: https://github.com/apache/doris/pull/48368#discussion_r1976233533


##########
be/src/vec/sink/vmysql_result_writer.cpp:
##########
@@ -70,27 +72,64 @@
 #include "vec/exprs/vexpr_context.h"
 #include "vec/runtime/vdatetime_value.h"
 
-namespace doris {
+namespace doris::vectorized {
 #include "common/compile_check_begin.h"
-namespace vectorized {
+
+void GetResultBatchCtx::on_failure(const Status& status) {
+    DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
+    status.to_protobuf(_result->mutable_status());
+    _done->Run();
+}
+
+void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) {
+    Status status;
+    status.to_protobuf(_result->mutable_status());
+    PQueryStatistics* statistics = _result->mutable_query_statistics();
+    statistics->set_returned_rows(returned_rows);
+    _result->set_packet_seq(packet_seq);
+    _result->set_eos(true);
+    _done->Run();
+}
+
+Status GetResultBatchCtx::on_data(const std::shared_ptr<TFetchDataResult>& 
t_result,
+                                  int64_t packet_seq, ResultBlockBufferBase* 
buffer) {
+    Status st = Status::OK();
+    if (t_result != nullptr) {
+        uint8_t* buf = nullptr;
+        uint32_t len = 0;
+        ThriftSerializer ser(false, 4096);
+        RETURN_IF_ERROR(ser.serialize(&t_result->result_batch, &len, &buf));
+        _result->set_row_batch(std::string((const char*)buf, len));
+    } else {
+        _result->clear_row_batch();
+        _result->set_empty_batch(true);
+    }
+    _result->set_packet_seq(packet_seq);
+    _result->set_eos(false);
+
+    /// The size limit of proto buffer message is 2G
+    if (_result->ByteSizeLong() > _max_msg_size) {
+        st = Status::InternalError("Message size exceeds 2GB: {}", 
_result->ByteSizeLong());
+        _result->clear_row_batch();
+        _result->set_empty_batch(true);
+    }
+    st.to_protobuf(_result->mutable_status());
+    _done->Run();
+    return Status::OK();
+}
 
 template <bool is_binary_format>
-VMysqlResultWriter<is_binary_format>::VMysqlResultWriter(BufferControlBlock* 
sinker,
-                                                         const 
VExprContextSPtrs& output_vexpr_ctxs,
-                                                         RuntimeProfile* 
parent_profile)
+VMysqlResultWriter<is_binary_format>::VMysqlResultWriter(
+        std::shared_ptr<ResultBlockBufferBase> sinker, const 
VExprContextSPtrs& output_vexpr_ctxs,
+        RuntimeProfile* parent_profile)
         : ResultWriter(),
-          _sinker(sinker),
+          _sinker(std::dynamic_pointer_cast<NormalResultBlockBuffer>(sinker)),

Review Comment:
   这里使用assert cast,并且把检查开关打开



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to