This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 168abaeb279 [improve](routine-load) optimize error msg when failed to fetch Kafka info #30298 168abaeb279 is described below commit 168abaeb2793f784813bc8d4f2f7c2856fea97f7 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Thu Jan 25 14:08:40 2024 +0800 [improve](routine-load) optimize error msg when failed to fetch Kafka info #30298 --- .../load/routineload/KafkaRoutineLoadJob.java | 8 ++-- .../routine_load/test_routine_load_error.groovy | 43 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index faad0a0248a..77171b6a4cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -325,13 +325,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { try { this.newCurrentKafkaPartition = getAllKafkaPartitions(); } catch (Exception e) { + String msg = e.getMessage() + + " may be Kafka properties set in job is error" + + " or no partition in this topic that should check Kafka"; LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage()) + .add("error_msg", msg) .build(), e); if (this.state == JobState.NEED_SCHEDULE) { unprotectUpdateState(JobState.PAUSED, - new ErrorReason(InternalErrorCode.PARTITIONS_ERR, - "Job failed to fetch all current partition with error " + e.getMessage()), + new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg), false /* not replay */); } } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy index 191ea4381fd..5b42e84be00 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy @@ -248,4 +248,47 @@ suite("test_routine_load_error","p0") { sql """ DROP TABLE IF EXISTS ${tableName} """ } } + + // test failed to fetch all current partition + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "invalid_topic" + try { + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "invalid_topic", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state != "PAUSED") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertTrue(res[0][17].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka")) + break; + } + } finally { + sql "stop routine load for ${jobName}" + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org