This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 61ff95e34564e28d2963f17402ce1150c18322e3
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Wed Oct 25 11:04:28 2023 +0800

    [improvement](routine-load) add routine load rows check (#25818)
---
 be/src/runtime/routine_load/data_consumer_group.cpp        | 3 ++-
 be/src/runtime/routine_load/data_consumer_group.h          | 6 ++++++
 be/src/runtime/routine_load/routine_load_task_executor.cpp | 9 +++++++++
 3 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 27eea942664..9274ccdb419 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -125,9 +125,10 @@ Status 
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
     bool eos = false;
     while (true) {
         if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
+            _rows = ctx->max_batch_rows - left_rows;
             LOG(INFO) << "consumer group done: " << _grp_id
                       << ". consume time(ms)=" << ctx->max_interval_s * 1000 - 
left_time
-                      << ", received rows=" << ctx->max_batch_rows - left_rows
+                      << ", received rows=" << _rows
                       << ", received bytes=" << ctx->max_batch_size - 
left_bytes << ", eos: " << eos
                       << ", left_time: " << left_time << ", left_rows: " << 
left_rows
                       << ", left_bytes: " << left_bytes
diff --git a/be/src/runtime/routine_load/data_consumer_group.h 
b/be/src/runtime/routine_load/data_consumer_group.h
index e15ad7115f6..0cda80a9ec4 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -60,6 +60,10 @@ public:
         ++_counter;
     }
 
+    int64_t get_consumer_rows() const { return _rows; }
+
+    void set_consumer_rows(int64_t rows) { _rows = rows; }
+
     // start all consumers
     virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
                              std::shared_ptr<io::KafkaConsumerPipe> 
kafka_pipe) {
@@ -77,6 +81,8 @@ protected:
     // when the counter becomes zero, shutdown the queue to finish
     std::mutex _mutex;
     int _counter;
+    // received total rows
+    int64_t _rows {0};
 };
 
 // for kafka
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 57a847515b9..d8a0dc6a289 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -352,6 +352,15 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
     // wait for all consumers finished
     HANDLE_ERROR(ctx->future.get(), "consume failed");
 
+    // check received and load rows
+    LOG(INFO) << "routine load task received rows: " << 
consumer_grp.get()->get_consumer_rows()
+              << " load total rows: " << ctx.get()->number_total_rows
+              << " loaded rows: " << ctx.get()->number_loaded_rows
+              << " filtered rows: " << ctx.get()->number_filtered_rows
+              << " unselected rows: " << ctx.get()->number_unselected_rows;
+    DCHECK(consumer_grp.get()->get_consumer_rows() == 
ctx.get()->number_total_rows);
+    consumer_grp.get()->set_consumer_rows(0);
+
     ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
 
     // return the consumer back to pool


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to