This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 23bce90f3b8f172f4acb20390d5868c97aab148e
Author: xy720 <22125576+xy...@users.noreply.github.com>
AuthorDate: Wed Apr 27 23:21:17 2022 +0800

    [fix](routine-load) Fix bug that new coming routine load tasks are rejected 
all the time and report TOO_MANY_TASK error (#9164)
    
    ```
    CREATE ROUTINE LOAD iaas.dws_nat ON dws_nat
    WITH APPEND PROPERTIES (
    "desired_concurrent_number"="2",
    "max_batch_interval" = "20",
    "max_batch_rows" = "400000",
    "max_batch_size" = "314572800",
    "format" = "json",
    "max_error_number" = "0"
    )
    FROM KAFKA (
    "kafka_broker_list" = "xxxx:xxxx",
    "kafka_topic" = "nat_nsq",
    "property.kafka_default_offsets" = "2022-04-19 13:20:00"
    );
    ```
    
    In the create statement example below, you can see
    The user didn't specify the custom partitions.
    So that 1. Fe will get all kafka partitions from server in routine load's 
scheduler.
    The user set the default offset by datetime.
    So that 2. Fe will get kafka offset by time from server in routine load's 
scheduler.
    
    When 1 is success, meanwhile 2 is failed, the progress of this routine load 
may not contains any partitions and offsets.
    Nevertheless, since newCurrentKafkaPartition which is get by kafka server 
may be always equal to currentKafkaPartitions,
    the wrong progress will never be updated.
---
 be/src/runtime/routine_load/data_consumer_pool.cpp |  6 ++
 .../routine_load/routine_load_task_executor.cpp    |  2 +-
 .../load/routineload/KafkaRoutineLoadJob.java      | 90 +++++++++++++++-------
 3 files changed, 71 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp 
b/be/src/runtime/routine_load/data_consumer_pool.cpp
index 5b0d9fabbe..5f039982a9 100644
--- a/be/src/runtime/routine_load/data_consumer_pool.cpp
+++ b/be/src/runtime/routine_load/data_consumer_pool.cpp
@@ -68,11 +68,17 @@ Status 
DataConsumerPool::get_consumer_grp(StreamLoadContext* ctx,
     }
     DCHECK(ctx->kafka_info);
 
+    if (ctx->kafka_info->begin_offset.size() == 0) {
+        return Status::InternalError(
+                "PAUSE: The size of begin_offset of task should not be 0.");
+    }
+
     std::shared_ptr<KafkaDataConsumerGroup> grp = 
std::make_shared<KafkaDataConsumerGroup>();
 
     // one data consumer group contains at least one data consumers.
     int max_consumer_num = config::max_consumer_num_per_group;
     size_t consumer_num = std::min((size_t)max_consumer_num, 
ctx->kafka_info->begin_offset.size());
+
     for (int i = 0; i < consumer_num; ++i) {
         std::shared_ptr<DataConsumer> consumer;
         RETURN_IF_ERROR(get_consumer(ctx, &consumer));
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 1c72a52eaa..cce1646b76 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -356,7 +356,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* 
ctx, DataConsumerPool
 
 void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const 
Status& st,
                                           const std::string& err_msg) {
-    LOG(WARNING) << err_msg;
+    LOG(WARNING) << err_msg << ", routine load task: " << ctx->brief(true);
     ctx->status = st;
     if (ctx->need_rollback) {
         _exec_env->stream_load_executor()->rollback_txn(ctx);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index bd67bd1685..f85898f10b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -132,6 +132,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet, timeZone);
     }
 
+    private long convertedDefaultOffsetToLong() {
+        if (this.kafkaDefaultOffSet.isEmpty()) {
+            return KafkaProgress.OFFSET_END_VAL;
+        } else {
+            if (isOffsetForTimes()) {
+                return convertedDefaultOffsetToTimestamp();
+            } else if 
(this.kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
+                return KafkaProgress.OFFSET_BEGINNING_VAL;
+            } else if 
(this.kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
+                return KafkaProgress.OFFSET_END_VAL;
+            } else {
+                return KafkaProgress.OFFSET_END_VAL;
+            }
+        }
+    }
+
     @Override
     public void prepare() throws UserException {
         super.prepare();
@@ -328,6 +344,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                         }
                         return true;
                     } else {
+                        // if the partitions of currentKafkaPartitions and 
progress are inconsistent,
+                        // We should also update the progress
+                        for (Integer kafkaPartition : currentKafkaPartitions) {
+                            if (!((KafkaProgress) 
progress).containsPartition(kafkaPartition)) {
+                                return true;
+                            }
+                        }
                         return false;
                     }
                 } else {
@@ -406,37 +429,52 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
     }
 
-    private void updateNewPartitionProgress() throws LoadException {
+    private void updateNewPartitionProgress() throws UserException {
         // update the progress of new partitions
-        for (Integer kafkaPartition : currentKafkaPartitions) {
-            if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) 
{
-                // if offset is not assigned, start from OFFSET_END
-                long beginOffSet = KafkaProgress.OFFSET_END_VAL;
-                if (!kafkaDefaultOffSet.isEmpty()) {
-                    if (isOffsetForTimes()) {
-                        // get offset by time
-                        List<Pair<Integer, Long>> offsets = 
Lists.newArrayList();
-                        offsets.add(Pair.create(kafkaPartition, 
convertedDefaultOffsetToTimestamp()));
-                        offsets = 
KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, 
convertedCustomProperties, offsets);
-                        Preconditions.checkState(offsets.size() == 1);
-                        beginOffSet = offsets.get(0).second;
-                    } else if 
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
-                        beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
-                    } else if 
(kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
-                        beginOffSet = KafkaProgress.OFFSET_END_VAL;
-                    } else {
-                        beginOffSet = KafkaProgress.OFFSET_END_VAL;
+        try {
+            for (Integer kafkaPartition : currentKafkaPartitions) {
+                if (!((KafkaProgress) 
progress).containsPartition(kafkaPartition)) {
+                    List<Integer> newPartitions = Lists.newArrayList();
+                    newPartitions.add(kafkaPartition);
+                    List<Pair<Integer, Long>> newPartitionsOffsets = 
getNewPartitionOffsetsFromDefaultOffset(newPartitions);
+                    Preconditions.checkState(newPartitionsOffsets.size() == 1);
+                    for (Pair<Integer, Long> partitionOffset : 
newPartitionsOffsets) {
+                        ((KafkaProgress) 
progress).addPartitionOffset(partitionOffset);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, 
id)
+                                    .add("kafka_partition_id", 
partitionOffset.first)
+                                    .add("begin_offset", 
partitionOffset.second)
+                                    .add("msg", "The new partition has been 
added in job"));
+                        }
                     }
                 }
-                ((KafkaProgress) 
progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet));
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                            .add("kafka_partition_id", kafkaPartition)
-                            .add("begin_offset", beginOffSet)
-                            .add("msg", "The new partition has been added in 
job"));
-                }
+            }
+        } catch (UserException e) {
+            unprotectUpdateState(JobState.PAUSED,
+                    new ErrorReason(InternalErrorCode.PARTITIONS_ERR, 
e.getMessage()), false /* not replay */);
+            throw e;
+        }
+    }
+
+    private List<Pair<Integer, Long>> 
getNewPartitionOffsetsFromDefaultOffset(List<Integer> newPartitions) throws 
UserException {
+        List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
+        // get default offset
+        long beginOffset = convertedDefaultOffsetToLong();
+        for (Integer kafkaPartition : newPartitions) {
+            partitionOffsets.add(Pair.create(kafkaPartition, beginOffset));
+        }
+        if (isOffsetForTimes()) {
+            try {
+                partitionOffsets = 
KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, 
convertedCustomProperties, partitionOffsets);
+            } catch (LoadException e) {
+                LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                        .add("partition:timestamp", 
Joiner.on(",").join(partitionOffsets))
+                        .add("error_msg", "Job failed to fetch current offsets 
from times with error " + e.getMessage())
+                        .build(), e);
+                throw new UserException(e);
             }
         }
+        return partitionOffsets;
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to