This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 61ff95e34564e28d2963f17402ce1150c18322e3 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Wed Oct 25 11:04:28 2023 +0800 [improvement](routine-load) add routine load rows check (#25818) --- be/src/runtime/routine_load/data_consumer_group.cpp | 3 ++- be/src/runtime/routine_load/data_consumer_group.h | 6 ++++++ be/src/runtime/routine_load/routine_load_task_executor.cpp | 9 +++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 27eea942664..9274ccdb419 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -125,9 +125,10 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, bool eos = false; while (true) { if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) { + _rows = ctx->max_batch_rows - left_rows; LOG(INFO) << "consumer group done: " << _grp_id << ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time - << ", received rows=" << ctx->max_batch_rows - left_rows + << ", received rows=" << _rows << ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos << ", left_time: " << left_time << ", left_rows: " << left_rows << ", left_bytes: " << left_bytes diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index e15ad7115f6..0cda80a9ec4 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -60,6 +60,10 @@ public: ++_counter; } + int64_t get_consumer_rows() const { return _rows; } + + void set_consumer_rows(int64_t rows) { _rows = rows; } + // start all consumers virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx, std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) { @@ -77,6 +81,8 @@ protected: // when the counter becomes zero, shutdown the queue to finish std::mutex _mutex; int _counter; + // received total rows + int64_t _rows {0}; }; // for kafka diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 57a847515b9..d8a0dc6a289 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -352,6 +352,15 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); + // check received and load rows + LOG(INFO) << "routine load task received rows: " << consumer_grp.get()->get_consumer_rows() + << " load total rows: " << ctx.get()->number_total_rows + << " loaded rows: " << ctx.get()->number_loaded_rows + << " filtered rows: " << ctx.get()->number_filtered_rows + << " unselected rows: " << ctx.get()->number_unselected_rows; + DCHECK(consumer_grp.get()->get_consumer_rows() == ctx.get()->number_total_rows); + consumer_grp.get()->set_consumer_rows(0); + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; // return the consumer back to pool --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org