This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b210e8f3a61 [fix](routine-load) fix routine load pause when Kafka data 
deleted after TTL (#37288)
b210e8f3a61 is described below

commit b210e8f3a6190bf1a85504a2e6b8b03306488225
Author: hui lai <1353307...@qq.com>
AuthorDate: Mon Jul 8 17:55:21 2024 +0800

    [fix](routine-load) fix routine load pause when Kafka data deleted after 
TTL (#37288)
    
    When using routine load, After the data load is completed, the lag is
    still a positive number:
    ```
      Lag: 
{"0":16,"1":15,"2":16,"3":16,"4":16,"5":16,"6":15,"7":16,"8":16,"9":16,"10":15,"11":16,"12":15,"13":15,"14":16,"15":16,"16":17,"17":15,"18":16,"19":15,"20":16,"21":16,"22":16,"23":16,"24":15,"25":17,"26":17,"27":16,"28":16,"29":16,"30":16,"31":17,"32":14,"33":16,"34":17,"35":16,"36":15,"37":15,"38":15,"39":16,"40":16,"41":16,"42":15,"43":15,"44":17,"45":16,"46":15,"47":15,"48":16,"49":17,"50":16,"51":15,"52":16,"53":15,"54":15,"55":17,"56":16,"57":17,"58":16,"59":16,"60":15,"61
 [...]
    ```
    and the routing load is paused when the Kafka data reaches TTL and is
    deleted, the error is `out of range`.
    
    The reason why this happened is EOF has it offset which needed
    statistics.
    
    **note(important):**
    After the bug is fixed, if you set
    ```
    "property.enable.partition.eof" = "false"
    ```
    in your routine load job, it will meet the problem. For EOF has offset,
    and the config is true in Doris default.
---
 be/src/runtime/routine_load/data_consumer.cpp      |  7 ++++-
 .../runtime/routine_load/data_consumer_group.cpp   | 36 +++++++++++++---------
 2 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index e61e1fdc2e0..92840721581 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -264,8 +264,13 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
             LOG(INFO) << "consumer meet partition eof: " << _id
                       << " partition offset: " << msg->offset();
             _consuming_partition_ids.erase(msg->partition());
-            if (_consuming_partition_ids.size() <= 0) {
+            if (!queue->blocking_put(msg.get())) {
                 done = true;
+            } else if (_consuming_partition_ids.size() <= 0) {
+                msg.release();
+                done = true;
+            } else {
+                msg.release();
             }
             break;
         }
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 8d07b0ec81a..fc714fab6e0 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -164,22 +164,28 @@ Status 
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
                         << ", partition: " << msg->partition() << ", offset: " 
<< msg->offset()
                         << ", len: " << msg->len();
 
-            Status st = (kafka_pipe.get()->*append_data)(static_cast<const 
char*>(msg->payload()),
-                                                         
static_cast<size_t>(msg->len()));
-            if (st.ok()) {
-                left_rows--;
-                left_bytes -= msg->len();
-                cmt_offset[msg->partition()] = msg->offset();
-                VLOG_NOTICE << "consume partition[" << msg->partition() << " - 
" << msg->offset()
-                            << "]";
+            if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
+                if (msg->offset() > 0) {
+                    cmt_offset[msg->partition()] = msg->offset() - 1;
+                }
             } else {
-                // failed to append this msg, we must stop
-                LOG(WARNING) << "failed to append msg to pipe. grp: " << 
_grp_id;
-                eos = true;
-                {
-                    std::unique_lock<std::mutex> lock(_mutex);
-                    if (result_st.ok()) {
-                        result_st = st;
+                Status st = (kafka_pipe.get()->*append_data)(
+                        static_cast<const char*>(msg->payload()), 
static_cast<size_t>(msg->len()));
+                if (st.ok()) {
+                    left_rows--;
+                    left_bytes -= msg->len();
+                    cmt_offset[msg->partition()] = msg->offset();
+                    VLOG_NOTICE << "consume partition[" << msg->partition() << 
" - "
+                                << msg->offset() << "]";
+                } else {
+                    // failed to append this msg, we must stop
+                    LOG(WARNING) << "failed to append msg to pipe. grp: " << 
_grp_id;
+                    eos = true;
+                    {
+                        std::unique_lock<std::mutex> lock(_mutex);
+                        if (result_st.ok()) {
+                            result_st = st;
+                        }
                     }
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to