github-actions[bot] commented on code in PR #44244:
URL: https://github.com/apache/doris/pull/44244#discussion_r1847630172


##########
be/src/runtime/buffer_control_block.h:
##########
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <arrow/type.h>

Review Comment:
   warning: 'arrow/type.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <arrow/type.h>
            ^
   ```
   



##########
be/src/runtime/buffer_control_block.cpp:
##########
@@ -191,54 +267,132 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* 
ctx) {
 
         ctx->on_data(result, _packet_num);
         _packet_num++;
-        _update_dependency();
         return;
     }
     if (_is_close) {
         ctx->on_close(_packet_num, _query_statistics.get());
-        _update_dependency();
         return;
     }
     // no ready data, push ctx to waiting list
     _waiting_rpc.push_back(ctx);
-    _update_dependency();
 }
 
-Status 
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* 
result) {
+Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* 
result,
+                                           cctz::time_zone& timezone_obj) {
     std::unique_lock<std::mutex> l(_lock);
+    Defer defer {[&]() { _update_dependency(); }};
     if (!_status.ok()) {
         return _status;
     }
     if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
+        return Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id)));
     }
 
-    while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
-        _arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
+    while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && 
!_is_close) {
+        _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
     }
 
     if (_is_cancelled) {
-        return Status::Cancelled("Cancelled");
+        return Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id)));
     }
 
-    if (!_arrow_flight_batch_queue.empty()) {
-        *result = std::move(_arrow_flight_batch_queue.front());
-        _arrow_flight_batch_queue.pop_front();
+    if (!_arrow_flight_result_batch_queue.empty()) {
+        *result = std::move(_arrow_flight_result_batch_queue.front());
+        _arrow_flight_result_batch_queue.pop_front();
+        timezone_obj = _timezone_obj;
+
         for (auto it : _instance_rows_in_queue.front()) {
             _instance_rows[it.first] -= it.second;
         }
         _instance_rows_in_queue.pop_front();
         _packet_num++;
-        _update_dependency();
         return Status::OK();
     }
 
     // normal path end
     if (_is_close) {
-        _update_dependency();
+        std::stringstream ss;
+        _profile.pretty_print(&ss);
+        VLOG_NOTICE << fmt::format(
+                "BufferControlBlock finished, fragment_id={}, is_close={}, 
is_cancelled={}, "
+                "packet_num={}, peak_memory_usage={}, profile={}",
+                print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+                _mem_tracker->peak_consumption(), ss.str());
+        return Status::OK();
+    }
+    return Status::InternalError(
+            fmt::format("Get Arrow Batch Abnormal Ending ()", 
print_id(_fragment_id)));
+}
+
+void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
+    std::unique_lock<std::mutex> l(_lock);
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    Defer defer {[&]() { _update_dependency(); }};
+    if (!_status.ok()) {
+        ctx->on_failure(_status);
+        return;
+    }
+    if (_is_cancelled) {
+        ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()", 
print_id(_fragment_id))));
+        return;
+    }
+
+    if (!_arrow_flight_result_batch_queue.empty()) {
+        auto block = _arrow_flight_result_batch_queue.front();
+        _arrow_flight_result_batch_queue.pop_front();
+        for (auto it : _instance_rows_in_queue.front()) {
+            _instance_rows[it.first] -= it.second;
+        }
+        _instance_rows_in_queue.pop_front();
+
+        ctx->on_data(block, _packet_num, _be_exec_version, 
_fragement_transmission_compression_type,
+                     _timezone, _serialize_batch_ns_timer, 
_uncompressed_bytes_counter,
+                     _compressed_bytes_counter);
+        _packet_num++;
+        return;
+    }
+
+    // normal path end
+    if (_is_close) {
+        ctx->on_close(_packet_num);
+        std::stringstream ss;
+        _profile.pretty_print(&ss);
+        VLOG_NOTICE << fmt::format(
+                "BufferControlBlock finished, fragment_id={}, is_close={}, 
is_cancelled={}, "
+                "packet_num={}, peak_memory_usage={}, profile={}",
+                print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+                _mem_tracker->peak_consumption(), ss.str());
+        return;
+    }
+    // no ready data, push ctx to waiting list
+    _waiting_arrow_result_batch_rpc.push_back(ctx);
+}
+
+void BufferControlBlock::register_arrow_schema(const 
std::shared_ptr<arrow::Schema>& arrow_schema) {
+    std::lock_guard<std::mutex> l(_lock);
+    _arrow_schema = arrow_schema;
+}
+
+Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* 
arrow_schema) {

Review Comment:
   warning: method 'find_arrow_schema' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* 
arrow_schema) const {
   ```
   



##########
be/src/service/arrow_flight/arrow_flight_batch_reader.h:
##########
@@ -17,40 +17,91 @@
 
 #pragma once
 
+#include <cctz/time_zone.h>

Review Comment:
   warning: 'cctz/time_zone.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <cctz/time_zone.h>
            ^
   ```
   



##########
be/src/util/arrow/row_batch.cpp:
##########
@@ -46,7 +46,8 @@ namespace doris {
 
 using strings::Substitute;
 
-Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result) {
+Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,

Review Comment:
   warning: function 'convert_to_arrow_type' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,
          ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/util/arrow/row_batch.cpp:48:** 108 lines including whitespace and 
comments (threshold 80)
   ```cpp
   Status convert_to_arrow_type(const TypeDescriptor& type, 
std::shared_ptr<arrow::DataType>* result,
          ^
   ```
   
   </details>
   



##########
be/src/runtime/result_buffer_mgr.h:
##########
@@ -17,7 +17,9 @@
 
 #pragma once
 
+#include <cctz/time_zone.h>

Review Comment:
   warning: 'cctz/time_zone.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <cctz/time_zone.h>
            ^
   ```
   



##########
be/src/service/arrow_flight/arrow_flight_batch_reader.cpp:
##########
@@ -17,53 +17,294 @@
 
 #include "service/arrow_flight/arrow_flight_batch_reader.h"
 
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
 #include <arrow/status.h>
+#include <arrow/type.h>
+#include <gen_cpp/internal_service.pb.h>
 
-#include "arrow/builder.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/result_buffer_mgr.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/arrow/block_convertor.h"
 #include "util/arrow/row_batch.h"
 #include "util/arrow/utils.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
 
-namespace doris {
-namespace flight {
+namespace doris::flight {
 
-std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const {
-    return schema_;
+ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase(
+        const std::shared_ptr<QueryStatement>& statement)
+        : _statement(statement) {}
+
+std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const {
+    return _schema;
+}
+
+arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const 
std::string& msg) {
+    std::string status_msg =
+            fmt::format("ArrowFlightBatchReader {}, packet_seq={}, 
result={}:{}, finistId={}", msg,
+                        _packet_seq, _statement->result_addr.hostname, 
_statement->result_addr.port,
+                        print_id(_statement->query_id));
+    LOG(WARNING) << status_msg;
+    return arrow::Status::Invalid(status_msg);
 }
 
-ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> 
statement,
-                                               std::shared_ptr<arrow::Schema> 
schema)
-        : statement_(std::move(statement)), schema_(std::move(schema)) {}
+ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   ```cpp
   ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
                               ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to