This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7b8512ecb66 [fix](multi table) fix single stream multi table access denied in cloud mode (#38878) 7b8512ecb66 is described below commit 7b8512ecb6628a47b2e030b11b56f408027d8d4b Author: hui lai <1353307...@qq.com> AuthorDate: Tue Aug 6 16:33:19 2024 +0800 [fix](multi table) fix single stream multi table access denied in cloud mode (#38878) Create single stream multi table job to load Kafka stream, error happened: ``` failed to get stream load plan, errCode = 2, detailMessage = Access denied for user 'doris_test@null' (using password: YES) '} ``` It is do not need check password for requests initiated internally for already check it when connection. --- .../org/apache/doris/load/StreamLoadHandler.java | 2 +- .../routine_load/test_routine_load_with_user.out | 7 + .../data/test_routine_load_with_user.csv | 1 + .../test_routine_load_with_user_multi_table.csv | 1 + .../test_routine_load_with_user.groovy | 171 +++++++++++++++++++++ 5 files changed, 181 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java index 496a2e4420d..0f44ec3f785 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java @@ -126,7 +126,7 @@ public class StreamLoadHandler { ctx.setRemoteIP(request.isSetAuthCode() ? clientAddr : request.getUserIp()); String userName = ClusterNamespace.getNameFromFullName(request.getUser()); - if (!request.isSetToken() && !Strings.isNullOrEmpty(userName)) { + if (!request.isSetToken() && !request.isSetAuthCode() && !Strings.isNullOrEmpty(userName)) { List<UserIdentity> currentUser = Lists.newArrayList(); try { Env.getCurrentEnv().getAuth().checkPlainPassword(userName, diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_with_user.out b/regression-test/data/load_p0/routine_load/test_routine_load_with_user.out new file mode 100644 index 00000000000..c1ce8d95b2a --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_user.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_with_user -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + +-- !sql_with_user_multi_table -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 "ghi" + diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_user.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_user.csv new file mode 100644 index 00000000000..b226b99ee4e --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_user.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_user_multi_table.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_user_multi_table.csv new file mode 100644 index 00000000000..b226b99ee4e --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_with_user_multi_table.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy new file mode 100644 index 00000000000..4f9522c73bc --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_user.groovy @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_with_user","p0") { + def kafkaCsvTpoics = [ + "test_routine_load_with_user", + "test_routine_load_with_user_multi_table" + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def jobName = "test_routine_load_with_user_job" + def tableName = "test_routine_load_with_user" + String user = 'test_routine_load_with_user' + def pwd = '123456' + sql """drop user if exists ${user}""" + sql """CREATE USER '${user}' IDENTIFIED BY '${pwd}'""" + sql """grant Load_priv on *.*.* to ${user}""" + sql """grant CREATE_priv on *.*.* to ${user}""" + sql """grant DROP_priv on *.*.* to ${user}""" + sql """grant Select_priv on *.*.* to ${user}""" + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO ${user}"""; + } + + connect(user=user, password="${pwd}", url=context.config.jdbcUrl) { + sql """ DROP TABLE IF EXISTS ${tableName}""" + sql """ CREATE TABLE IF NOT EXISTS ${tableName} + ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + sql "sync" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + sql """ + CREATE ROUTINE LOAD ${jobName} ON ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + def count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + qt_sql_with_user "select * from ${tableName} order by k1" + + sql "stop routine load for ${jobName}" + + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[1]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + count = 0 + while (true) { + def res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for ${jobName}" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + qt_sql_with_user_multi_table "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org