This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4c52883acff [fix](routine load) fix show routine load task result incorrect (#38523) 4c52883acff is described below commit 4c52883acff993c68a55705cebf80943fbd1060e Author: hui lai <1353307...@qq.com> AuthorDate: Thu Aug 1 10:15:59 2024 +0800 [fix](routine load) fix show routine load task result incorrect (#38523) ### Bug report: Create a job: ``` CREATE ROUTINE LOAD testShow ON test_show_routine_load COLUMNS TERMINATED BY "," PROPERTIES ( "max_batch_interval" = "5", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "127.0.0.1:19092", "kafka_topic" = "test_show_routine_load", "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); ``` show routine load task: ``` SHOW ROUTINE LOAD TASK WHERE JobName = "testShow"; ``` result: ``` ERROR 1105 (HY000): errCode = 2, detailMessage = The job named testshowdoes not exists or job state is stopped or cancelled ``` ### Solution Do not use `toLowerCase` method; --- .../doris/analysis/ShowRoutineLoadTaskStmt.java | 2 +- .../routine_load/data/test_show_routine_load.csv | 1 + .../routine_load/test_show_routine_load.groovy | 167 +++++++++++++++------ 3 files changed, 120 insertions(+), 50 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java index 7258822ab50..570b82d3667 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java @@ -121,7 +121,7 @@ public class ShowRoutineLoadTaskStmt extends ShowStmt { break CHECK; } StringLiteral stringLiteral = (StringLiteral) binaryPredicate.getChild(1); - jobName = stringLiteral.getValue().toLowerCase(); + jobName = stringLiteral.getValue(); } // CHECKSTYLE IGNORE THIS LINE if (!valid) { diff --git a/regression-test/suites/load_p0/routine_load/data/test_show_routine_load.csv b/regression-test/suites/load_p0/routine_load/data/test_show_routine_load.csv new file mode 100644 index 00000000000..b226b99ee4e --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_show_routine_load.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy index 9fb681576d6..6075dc20dbe 100644 --- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy @@ -15,67 +15,136 @@ // specific language governing permissions and limitations // under the License. +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + suite("test_show_routine_load","p0") { + def kafkaCsvTpoics = [ + "test_show_routine_load", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") String kafka_port = context.config.otherConfigs.get("kafka_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") def kafka_broker = "${externalEnvIp}:${kafka_port}" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) - try { - sql """ - CREATE ROUTINE LOAD testShow - COLUMNS TERMINATED BY "|" - PROPERTIES - ( - "max_batch_interval" = "5", - "max_batch_rows" = "300000", - "max_batch_size" = "209715200" - ) - FROM KAFKA - ( - "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", - "kafka_topic" = "multi_table_load_invalid_table", - "property.kafka_default_offsets" = "OFFSET_BEGINNING" - ); - """ + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def tableName = "test_show_routine_load" + sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ - CREATE ROUTINE LOAD testShow1 - COLUMNS TERMINATED BY "|" - PROPERTIES - ( - "max_batch_interval" = "5", - "max_batch_rows" = "300000", - "max_batch_size" = "209715200" - ) - FROM KAFKA - ( - "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", - "kafka_topic" = "multi_table_load_invalid_table", - "property.kafka_default_offsets" = "OFFSET_BEGINNING" - ); + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); """ - sql "sync" - String db = context.config.getDbNameByFile(context.file) - log.info("reason of state changed: ${db}".toString()) + try { + sql """ + CREATE ROUTINE LOAD testShow ON ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql """ + CREATE ROUTINE LOAD testShow1 ON ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + String db = context.config.getDbNameByFile(context.file) + log.info("reason of state changed: ${db}".toString()) + + def res = sql "show routine load for ${db}.testShow" + log.info("reason of state changed: ${res.size()}".toString()) + assertTrue(res.size() == 1) - def res = sql "show routine load for ${db}.testShow" - log.info("reason of state changed: ${res.size()}".toString()) - assertTrue(res.size() == 1) + res = sql "show routine load for testShow" + log.info("reason of state changed: ${res.size()}".toString()) + assertTrue(res.size() == 1) - res = sql "show routine load for testShow" - log.info("reason of state changed: ${res.size()}".toString()) - assertTrue(res.size() == 1) + res = sql "show all routine load" + log.info("reason of state changed: ${res.size()}".toString()) + assertTrue(res.size() > 1) - res = sql "show all routine load" - log.info("reason of state changed: ${res.size()}".toString()) - assertTrue(res.size() > 1) + res = sql "SHOW ROUTINE LOAD LIKE \"%testShow%\"" + log.info("reason of state changed: ${res.size()}".toString()) + assertTrue(res.size() == 2) - res = sql "SHOW ROUTINE LOAD LIKE \"%testShow%\"" - log.info("reason of state changed: ${res.size()}".toString()) - assertTrue(res.size() == 2) - } finally { - sql "stop routine load for testShow" - sql "stop routine load for testShow1" + def count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for testShow1" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + res = sql "SHOW ROUTINE LOAD TASK WHERE JobName = \"testShow1\";" + log.info("SHOW ROUTINE LOAD task result: ${res}".toString()) + assertTrue(res.size() == 1) + } finally { + sql "stop routine load for testShow" + sql "stop routine load for testShow1" + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org