This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2112844db35504d62490c5ac11c465013a35c92d
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri Oct 13 15:47:16 2023 +0800

    [fix](multi-table) fix single stream multi table load can not finish 
(#25379)
---
 be/src/runtime/routine_load/data_consumer_group.cpp        | 7 ++-----
 be/src/runtime/routine_load/data_consumer_group.h          | 9 +++++++--
 be/src/runtime/routine_load/routine_load_task_executor.cpp | 6 +++++-
 3 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index d27c86fdf01..27eea942664 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -25,7 +25,6 @@
 #include <utility>
 
 #include "common/logging.h"
-#include "io/fs/kafka_consumer_pipe.h"
 #include "librdkafka/rdkafkacpp.h"
 #include "runtime/routine_load/data_consumer.h"
 #include "runtime/stream_load/stream_load_context.h"
@@ -72,7 +71,8 @@ KafkaDataConsumerGroup::~KafkaDataConsumerGroup() {
     DCHECK(_queue.get_size() == 0);
 }
 
-Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> 
ctx) {
+Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> 
ctx,
+                                         
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
     Status result_st = Status::OK();
     // start all consumers
     for (auto& consumer : _consumers) {
@@ -105,9 +105,6 @@ Status 
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx)
     int64_t left_rows = ctx->max_batch_rows;
     int64_t left_bytes = ctx->max_batch_size;
 
-    std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
-            std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
-
     LOG(INFO) << "start consumer group: " << _grp_id << ". max time(ms): " << 
left_time
               << ", batch rows: " << left_rows << ", batch size: " << 
left_bytes << ". "
               << ctx->brief();
diff --git a/be/src/runtime/routine_load/data_consumer_group.h 
b/be/src/runtime/routine_load/data_consumer_group.h
index e7be39f5a69..e15ad7115f6 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "io/fs/kafka_consumer_pipe.h"
 #include "runtime/routine_load/data_consumer.h"
 #include "util/blocking_queue.hpp"
 #include "util/uid_util.h"
@@ -60,7 +61,10 @@ public:
     }
 
     // start all consumers
-    virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx) { return 
Status::OK(); }
+    virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
+                             std::shared_ptr<io::KafkaConsumerPipe> 
kafka_pipe) {
+        return Status::OK();
+    }
 
 protected:
     UniqueId _grp_id;
@@ -82,7 +86,8 @@ public:
 
     virtual ~KafkaDataConsumerGroup();
 
-    Status start_all(std::shared_ptr<StreamLoadContext> ctx) override;
+    Status start_all(std::shared_ptr<StreamLoadContext> ctx,
+                     std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) 
override;
     // assign topic partitions to all consumers equally
     Status assign_topic_partitions(std::shared_ptr<StreamLoadContext> ctx);
 
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 023ebbe55f7..57a847515b9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -333,8 +333,11 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
 #endif
     }
 
+    std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
+            std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
+
     // start to consume, this may block a while
-    HANDLE_ERROR(consumer_grp->start_all(ctx), "consuming failed");
+    HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
 
     if (ctx->is_multi_table) {
         // plan the rest of unplanned data
@@ -343,6 +346,7 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
                      "multi tables task executes plan error");
         // need memory order
         multi_table_pipe->set_consume_finished();
+        HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
     }
 
     // wait for all consumers finished


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to