xinyiZzz commented on code in PR #43281:
URL: https://github.com/apache/doris/pull/43281#discussion_r1832197362


##########
be/src/runtime/buffer_control_block.cpp:
##########
@@ -191,56 +269,115 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* 
ctx) {
 
         ctx->on_data(result, _packet_num);
         _packet_num++;
-        _update_dependency();
         return;
     }
     if (_is_close) {
         ctx->on_close(_packet_num, _query_statistics.get());
-        _update_dependency();
         return;
     }
     // no ready data, push ctx to waiting list
     _waiting_rpc.push_back(ctx);
-    _update_dependency();
 }
 
-Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* 
result) {
+Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* 
result,
+                                           cctz::time_zone& timezone_obj) {
     std::unique_lock<std::mutex> l(_lock);
+    Defer defer {[&]() { _update_dependency(); }};
     if (!_status.ok()) {
         return _status;
     }
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");
     }
 
-    while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
-        _arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
+    while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && 
!_is_close) {
+        _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
     }
 
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");
     }
 
-    if (!_arrow_flight_batch_queue.empty()) {
-        *result = std::move(_arrow_flight_batch_queue.front());
-        _arrow_flight_batch_queue.pop_front();
+    if (!_arrow_flight_result_batch_queue.empty()) {
+        *result = std::move(_arrow_flight_result_batch_queue.front());
+        _arrow_flight_result_batch_queue.pop_front();
+        timezone_obj = _timezone_obj;
+
         for (auto it : _instance_rows_in_queue.front()) {
             _instance_rows[it.first] -= it.second;
         }
         _instance_rows_in_queue.pop_front();
         _packet_num++;
-        _update_dependency();
         return Status::OK();
     }
 
     // normal path end
     if (_is_close) {
-        _update_dependency();
+#ifndef NDEBUG
+        std::stringstream ss;
+        _profile.pretty_print(&ss);
+        LOG(INFO) << fmt::format(
+                "BufferControlBlock finished, fragment_id={}, is_close={}, 
is_cancelled={}, "
+                "packet_num={}, peak_memory_usage={}, profile={}",
+                print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+                _mem_tracker->peak_consumption(), ss.str());
+#endif
         return Status::OK();
     }
     return Status::InternalError("Get Arrow Batch Abnormal Ending");
 }
 
+void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
+    std::unique_lock<std::mutex> l(_lock);
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    Defer defer {[&]() { _update_dependency(); }};
+    if (!_status.ok()) {
+        ctx->on_failure(_status);
+        return;
+    }
+    if (_is_cancelled) {
+        ctx->on_failure(Status::Cancelled("Cancelled"));
+        return;
+    }
+
+    if (!_arrow_flight_result_batch_queue.empty()) {
+        auto block = _arrow_flight_result_batch_queue.front();
+        _arrow_flight_result_batch_queue.pop_front();
+        for (auto it : _instance_rows_in_queue.front()) {
+            _instance_rows[it.first] -= it.second;
+        }
+        _instance_rows_in_queue.pop_front();
+
+        ctx->on_data(block, _packet_num, _be_exec_version, 
_fragement_transmission_compression_type,
+                     _timezone, _arrow_schema_field_names, 
_serialize_batch_ns_timer,
+                     _uncompressed_bytes_counter, _compressed_bytes_counter);
+        _packet_num++;
+        return;
+    }
+
+    // normal path end
+    if (_is_close) {
+        ctx->on_close(_packet_num);
+#ifndef NDEBUG
+        std::stringstream ss;
+        _profile.pretty_print(&ss);
+        LOG(INFO) << fmt::format(
+                "BufferControlBlock finished, fragment_id={}, is_close={}, 
is_cancelled={}, "
+                "packet_num={}, peak_memory_usage={}, profile={}",
+                print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+                _mem_tracker->peak_consumption(), ss.str());
+#endif
+        return;
+    }
+    // no ready data, push ctx to waiting list
+    _waiting_arrow_result_batch_rpc.push_back(ctx);
+}
+
+void BufferControlBlock::register_arrow_schema(const 
std::shared_ptr<arrow::Schema>& arrow_schema) {
+    _arrow_schema = arrow_schema;
+    _arrow_schema_field_names = join(_arrow_schema->field_names(), ",");

Review Comment:
   schema内部的嵌套机构没问题,只是把 schema 最外层用于展示的表头传到对端



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to