This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b210e8f3a61 [fix](routine-load) fix routine load pause when Kafka data deleted after TTL (#37288) b210e8f3a61 is described below commit b210e8f3a6190bf1a85504a2e6b8b03306488225 Author: hui lai <1353307...@qq.com> AuthorDate: Mon Jul 8 17:55:21 2024 +0800 [fix](routine-load) fix routine load pause when Kafka data deleted after TTL (#37288) When using routine load, After the data load is completed, the lag is still a positive number: ``` Lag: {"0":16,"1":15,"2":16,"3":16,"4":16,"5":16,"6":15,"7":16,"8":16,"9":16,"10":15,"11":16,"12":15,"13":15,"14":16,"15":16,"16":17,"17":15,"18":16,"19":15,"20":16,"21":16,"22":16,"23":16,"24":15,"25":17,"26":17,"27":16,"28":16,"29":16,"30":16,"31":17,"32":14,"33":16,"34":17,"35":16,"36":15,"37":15,"38":15,"39":16,"40":16,"41":16,"42":15,"43":15,"44":17,"45":16,"46":15,"47":15,"48":16,"49":17,"50":16,"51":15,"52":16,"53":15,"54":15,"55":17,"56":16,"57":17,"58":16,"59":16,"60":15,"61 [...] ``` and the routing load is paused when the Kafka data reaches TTL and is deleted, the error is `out of range`. The reason why this happened is EOF has it offset which needed statistics. **note(important):** After the bug is fixed, if you set ``` "property.enable.partition.eof" = "false" ``` in your routine load job, it will meet the problem. For EOF has offset, and the config is true in Doris default. --- be/src/runtime/routine_load/data_consumer.cpp | 7 ++++- .../runtime/routine_load/data_consumer_group.cpp | 36 +++++++++++++--------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index e61e1fdc2e0..92840721581 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -264,8 +264,13 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue, LOG(INFO) << "consumer meet partition eof: " << _id << " partition offset: " << msg->offset(); _consuming_partition_ids.erase(msg->partition()); - if (_consuming_partition_ids.size() <= 0) { + if (!queue->blocking_put(msg.get())) { done = true; + } else if (_consuming_partition_ids.size() <= 0) { + msg.release(); + done = true; + } else { + msg.release(); } break; } diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 8d07b0ec81a..fc714fab6e0 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -164,22 +164,28 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx, << ", partition: " << msg->partition() << ", offset: " << msg->offset() << ", len: " << msg->len(); - Status st = (kafka_pipe.get()->*append_data)(static_cast<const char*>(msg->payload()), - static_cast<size_t>(msg->len())); - if (st.ok()) { - left_rows--; - left_bytes -= msg->len(); - cmt_offset[msg->partition()] = msg->offset(); - VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset() - << "]"; + if (msg->err() == RdKafka::ERR__PARTITION_EOF) { + if (msg->offset() > 0) { + cmt_offset[msg->partition()] = msg->offset() - 1; + } } else { - // failed to append this msg, we must stop - LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; - eos = true; - { - std::unique_lock<std::mutex> lock(_mutex); - if (result_st.ok()) { - result_st = st; + Status st = (kafka_pipe.get()->*append_data)( + static_cast<const char*>(msg->payload()), static_cast<size_t>(msg->len())); + if (st.ok()) { + left_rows--; + left_bytes -= msg->len(); + cmt_offset[msg->partition()] = msg->offset(); + VLOG_NOTICE << "consume partition[" << msg->partition() << " - " + << msg->offset() << "]"; + } else { + // failed to append this msg, we must stop + LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; + eos = true; + { + std::unique_lock<std::mutex> lock(_mutex); + if (result_st.ok()) { + result_st = st; + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org