This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 23bce90f3b8f172f4acb20390d5868c97aab148e Author: xy720 <22125576+xy...@users.noreply.github.com> AuthorDate: Wed Apr 27 23:21:17 2022 +0800 [fix](routine-load) Fix bug that new coming routine load tasks are rejected all the time and report TOO_MANY_TASK error (#9164) ``` CREATE ROUTINE LOAD iaas.dws_nat ON dws_nat WITH APPEND PROPERTIES ( "desired_concurrent_number"="2", "max_batch_interval" = "20", "max_batch_rows" = "400000", "max_batch_size" = "314572800", "format" = "json", "max_error_number" = "0" ) FROM KAFKA ( "kafka_broker_list" = "xxxx:xxxx", "kafka_topic" = "nat_nsq", "property.kafka_default_offsets" = "2022-04-19 13:20:00" ); ``` In the create statement example below, you can see The user didn't specify the custom partitions. So that 1. Fe will get all kafka partitions from server in routine load's scheduler. The user set the default offset by datetime. So that 2. Fe will get kafka offset by time from server in routine load's scheduler. When 1 is success, meanwhile 2 is failed, the progress of this routine load may not contains any partitions and offsets. Nevertheless, since newCurrentKafkaPartition which is get by kafka server may be always equal to currentKafkaPartitions, the wrong progress will never be updated. --- be/src/runtime/routine_load/data_consumer_pool.cpp | 6 ++ .../routine_load/routine_load_task_executor.cpp | 2 +- .../load/routineload/KafkaRoutineLoadJob.java | 90 +++++++++++++++------- 3 files changed, 71 insertions(+), 27 deletions(-) diff --git a/be/src/runtime/routine_load/data_consumer_pool.cpp b/be/src/runtime/routine_load/data_consumer_pool.cpp index 5b0d9fabbe..5f039982a9 100644 --- a/be/src/runtime/routine_load/data_consumer_pool.cpp +++ b/be/src/runtime/routine_load/data_consumer_pool.cpp @@ -68,11 +68,17 @@ Status DataConsumerPool::get_consumer_grp(StreamLoadContext* ctx, } DCHECK(ctx->kafka_info); + if (ctx->kafka_info->begin_offset.size() == 0) { + return Status::InternalError( + "PAUSE: The size of begin_offset of task should not be 0."); + } + std::shared_ptr<KafkaDataConsumerGroup> grp = std::make_shared<KafkaDataConsumerGroup>(); // one data consumer group contains at least one data consumers. int max_consumer_num = config::max_consumer_num_per_group; size_t consumer_num = std::min((size_t)max_consumer_num, ctx->kafka_info->begin_offset.size()); + for (int i = 0; i < consumer_num; ++i) { std::shared_ptr<DataConsumer> consumer; RETURN_IF_ERROR(get_consumer(ctx, &consumer)); diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 1c72a52eaa..cce1646b76 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -356,7 +356,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool void RoutineLoadTaskExecutor::err_handler(StreamLoadContext* ctx, const Status& st, const std::string& err_msg) { - LOG(WARNING) << err_msg; + LOG(WARNING) << err_msg << ", routine load task: " << ctx->brief(true); ctx->status = st; if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index bd67bd1685..f85898f10b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -132,6 +132,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { return TimeUtils.timeStringToLong(this.kafkaDefaultOffSet, timeZone); } + private long convertedDefaultOffsetToLong() { + if (this.kafkaDefaultOffSet.isEmpty()) { + return KafkaProgress.OFFSET_END_VAL; + } else { + if (isOffsetForTimes()) { + return convertedDefaultOffsetToTimestamp(); + } else if (this.kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + return KafkaProgress.OFFSET_BEGINNING_VAL; + } else if (this.kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + return KafkaProgress.OFFSET_END_VAL; + } else { + return KafkaProgress.OFFSET_END_VAL; + } + } + } + @Override public void prepare() throws UserException { super.prepare(); @@ -328,6 +344,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } return true; } else { + // if the partitions of currentKafkaPartitions and progress are inconsistent, + // We should also update the progress + for (Integer kafkaPartition : currentKafkaPartitions) { + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + return true; + } + } return false; } } else { @@ -406,37 +429,52 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } } - private void updateNewPartitionProgress() throws LoadException { + private void updateNewPartitionProgress() throws UserException { // update the progress of new partitions - for (Integer kafkaPartition : currentKafkaPartitions) { - if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { - // if offset is not assigned, start from OFFSET_END - long beginOffSet = KafkaProgress.OFFSET_END_VAL; - if (!kafkaDefaultOffSet.isEmpty()) { - if (isOffsetForTimes()) { - // get offset by time - List<Pair<Integer, Long>> offsets = Lists.newArrayList(); - offsets.add(Pair.create(kafkaPartition, convertedDefaultOffsetToTimestamp())); - offsets = KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, convertedCustomProperties, offsets); - Preconditions.checkState(offsets.size() == 1); - beginOffSet = offsets.get(0).second; - } else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { - beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL; - } else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { - beginOffSet = KafkaProgress.OFFSET_END_VAL; - } else { - beginOffSet = KafkaProgress.OFFSET_END_VAL; + try { + for (Integer kafkaPartition : currentKafkaPartitions) { + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + List<Integer> newPartitions = Lists.newArrayList(); + newPartitions.add(kafkaPartition); + List<Pair<Integer, Long>> newPartitionsOffsets = getNewPartitionOffsetsFromDefaultOffset(newPartitions); + Preconditions.checkState(newPartitionsOffsets.size() == 1); + for (Pair<Integer, Long> partitionOffset : newPartitionsOffsets) { + ((KafkaProgress) progress).addPartitionOffset(partitionOffset); + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("kafka_partition_id", partitionOffset.first) + .add("begin_offset", partitionOffset.second) + .add("msg", "The new partition has been added in job")); + } } } - ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet)); - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("kafka_partition_id", kafkaPartition) - .add("begin_offset", beginOffSet) - .add("msg", "The new partition has been added in job")); - } + } + } catch (UserException e) { + unprotectUpdateState(JobState.PAUSED, + new ErrorReason(InternalErrorCode.PARTITIONS_ERR, e.getMessage()), false /* not replay */); + throw e; + } + } + + private List<Pair<Integer, Long>> getNewPartitionOffsetsFromDefaultOffset(List<Integer> newPartitions) throws UserException { + List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList(); + // get default offset + long beginOffset = convertedDefaultOffsetToLong(); + for (Integer kafkaPartition : newPartitions) { + partitionOffsets.add(Pair.create(kafkaPartition, beginOffset)); + } + if (isOffsetForTimes()) { + try { + partitionOffsets = KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, convertedCustomProperties, partitionOffsets); + } catch (LoadException e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("partition:timestamp", Joiner.on(",").join(partitionOffsets)) + .add("error_msg", "Job failed to fetch current offsets from times with error " + e.getMessage()) + .build(), e); + throw new UserException(e); } } + return partitionOffsets; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org