This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 168abaeb279 [improve](routine-load) optimize error msg when failed to 
fetch Kafka info #30298
168abaeb279 is described below

commit 168abaeb2793f784813bc8d4f2f7c2856fea97f7
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Thu Jan 25 14:08:40 2024 +0800

    [improve](routine-load) optimize error msg when failed to fetch Kafka info 
#30298
---
 .../load/routineload/KafkaRoutineLoadJob.java      |  8 ++--
 .../routine_load/test_routine_load_error.groovy    | 43 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index faad0a0248a..77171b6a4cb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -325,13 +325,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         try {
             this.newCurrentKafkaPartition = getAllKafkaPartitions();
         } catch (Exception e) {
+            String msg = e.getMessage()
+                        + " may be Kafka properties set in job is error"
+                        + " or no partition in this topic that should check 
Kafka";
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
-                    .add("error_msg", "Job failed to fetch all current 
partition with error " + e.getMessage())
+                    .add("error_msg", msg)
                     .build(), e);
             if (this.state == JobState.NEED_SCHEDULE) {
                 unprotectUpdateState(JobState.PAUSED,
-                        new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
-                                "Job failed to fetch all current partition 
with error " + e.getMessage()),
+                        new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
                         false /* not replay */);
             }
         }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
index 191ea4381fd..5b42e84be00 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy
@@ -248,4 +248,47 @@ suite("test_routine_load_error","p0") {
             sql """ DROP TABLE IF EXISTS ${tableName} """
         }
     }
+
+    // test failed to fetch all current partition
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "invalid_topic"
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName}
+                COLUMNS TERMINATED BY "|"
+                PROPERTIES
+                (
+                    "max_batch_interval" = "5",
+                    "max_batch_rows" = "300000",
+                    "max_batch_size" = "209715200"
+                )
+                FROM KAFKA
+                (
+                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                    "kafka_topic" = "invalid_topic",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+            sql "sync"
+
+            def count = 0
+            while (true) {
+                sleep(1000)
+                def res = sql "show routine load for ${jobName}"
+                def state = res[0][8].toString()
+                if (state != "PAUSED") {
+                    count++
+                    if (count > 60) {
+                        assertEquals(1, 2)
+                    } 
+                    continue;
+                }
+                log.info("reason of state changed: 
${res[0][17].toString()}".toString())
+                assertTrue(res[0][17].toString().contains("may be Kafka 
properties set in job is error or no partition in this topic that should check 
Kafka"))
+                break;
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+        }
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to