This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2112844db35504d62490c5ac11c465013a35c92d Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Fri Oct 13 15:47:16 2023 +0800 [fix](multi-table) fix single stream multi table load can not finish (#25379) --- be/src/runtime/routine_load/data_consumer_group.cpp | 7 ++----- be/src/runtime/routine_load/data_consumer_group.h | 9 +++++++-- be/src/runtime/routine_load/routine_load_task_executor.cpp | 6 +++++- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index d27c86fdf01..27eea942664 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -25,7 +25,6 @@ #include <utility> #include "common/logging.h" -#include "io/fs/kafka_consumer_pipe.h" #include "librdkafka/rdkafkacpp.h" #include "runtime/routine_load/data_consumer.h" #include "runtime/stream_load/stream_load_context.h" @@ -72,7 +71,8 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() { DCHECK(_queue.get_size() == 0); } -Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx) { +Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, + std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) { Status result_st = Status::OK(); // start all consumers for (auto& consumer : _consumers) { @@ -105,9 +105,6 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx) int64_t left_rows = ctx->max_batch_rows; int64_t left_bytes = ctx->max_batch_size; - std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe = - std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink); - LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << left_time << ", batch rows: " << left_rows << ", batch size: " << left_bytes << ". " << ctx->brief(); diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h index e7be39f5a69..e15ad7115f6 100644 --- a/be/src/runtime/routine_load/data_consumer_group.h +++ b/be/src/runtime/routine_load/data_consumer_group.h @@ -25,6 +25,7 @@ #include <vector> #include "common/status.h" +#include "io/fs/kafka_consumer_pipe.h" #include "runtime/routine_load/data_consumer.h" #include "util/blocking_queue.hpp" #include "util/uid_util.h" @@ -60,7 +61,10 @@ public: } // start all consumers - virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx) { return Status::OK(); } + virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx, + std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) { + return Status::OK(); + } protected: UniqueId _grp_id; @@ -82,7 +86,8 @@ public: virtual ~KafkaDataConsumerGroup(); - Status start_all(std::shared_ptr<StreamLoadContext> ctx) override; + Status start_all(std::shared_ptr<StreamLoadContext> ctx, + std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) override; // assign topic partitions to all consumers equally Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 023ebbe55f7..57a847515b9 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -333,8 +333,11 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, #endif } + std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe = + std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink); + // start to consume, this may block a while - HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed"); + HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed"); if (ctx->is_multi_table) { // plan the rest of unplanned data @@ -343,6 +346,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, "multi tables task executes plan error"); // need memory order multi_table_pipe->set_consume_finished(); + HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed"); } // wait for all consumers finished --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org