This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-refactor_property in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-refactor_property by this push: new 3896b94db41 [feat](storage)Refactor HDFS integration: move basic checks to storage layer and add tests (#49932) 3896b94db41 is described below commit 3896b94db41297b91f4a70aa0fbc3f8b73bb76bf Author: Calvin Kirs <guoqi...@selectdb.com> AuthorDate: Sat Apr 12 10:25:36 2025 +0800 [feat](storage)Refactor HDFS integration: move basic checks to storage layer and add tests (#49932) ### What problem does this PR solve? Refactored HDFS parameter integration to align with the updated configuration structure. Moved basic validation and parameter transformation logic down to the HDFS-specific storage layer, keeping the business layer clean and storage-agnostic. Maintained consistent behavior at the business layer, ensuring it does not need to handle any storage-specific logic. --- .../doris/common/CatalogConfigFileUtils.java | 2 +- fe/fe-core/pom.xml | 10 + .../java/org/apache/doris/analysis/LoadStmt.java | 21 -- .../property/storage/HDFSProperties.java | 37 ++- .../property/storage/HdfsPropertiesUtils.java | 100 +++++++ .../datasource/property/storage/S3Properties.java | 6 +- .../property/storage/StorageProperties.java | 2 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 7 +- .../tablefunction/HdfsTableValuedFunction.java | 38 +-- .../refactor_storage_param_p0/hdfs_all_test.groovy | 318 +++++++++++++++++++++ 10 files changed, 475 insertions(+), 66 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java index e6fe4c7f10c..6a1c552d972 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/CatalogConfigFileUtils.java @@ -52,7 +52,7 @@ public class CatalogConfigFileUtils { // Iterate over the comma-separated list of resource files. for (String resource : resourcesPath.split(",")) { // Construct the full path to the resource file. - String resourcePath = configDir + File.separator + resource.trim(); + String resourcePath = configDir + resource.trim(); File file = new File(resourcePath); // Check if the file exists and is a regular file; if not, throw an exception. diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 371481a2e91..3cfd3b64739 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -573,6 +573,16 @@ under the License. <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + <version>${hadoop.version}</version> + </dependency> <!-- lakesoul --> <dependency> diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index ba835821a3e..6b92bee2a56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.datasource.property.storage.ObjectStorageProperties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.load.EtlJobType; @@ -520,26 +519,6 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser { } - private String getProviderFromEndpoint() { - Map<String, String> properties = brokerDesc.getProperties(); - for (Map.Entry<String, String> entry : properties.entrySet()) { - if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) { - // S3 Provider properties should be case insensitive. - return entry.getValue().toUpperCase(); - } - } - return S3Properties.S3_PROVIDER; - } - - private String getBucketFromFilePath(String filePath) throws Exception { - String[] parts = filePath.split("\\/\\/"); - if (parts.length < 2) { - throw new Exception("filePath is not valid"); - } - String buckt = parts[1].split("\\/")[0]; - return buckt; - } - public String getComment() { return comment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java index 6b64df1817a..159a46b7e88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HDFSProperties.java @@ -17,12 +17,12 @@ package org.apache.doris.datasource.property.storage; -import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.ConnectorProperty; import com.google.common.base.Strings; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import java.util.HashMap; @@ -61,6 +61,9 @@ public class HDFSProperties extends StorageProperties { description = "Whether to enable the impersonation of HDFS.") private boolean hdfsImpersonationEnabled = false; + @ConnectorProperty(names = {"fs.defaultFS"}, required = false, description = "") + private String fsDefaultFS = ""; + /** * The final HDFS configuration map that determines the effective settings. * Priority rules: @@ -72,17 +75,26 @@ public class HDFSProperties extends StorageProperties { public HDFSProperties(Map<String, String> origProps) { super(Type.HDFS, origProps); - // to be care setOrigProps(matchParams); loadFinalHdfsConfig(origProps); } + public static boolean guessIsMe(Map<String, String> props) { + if (MapUtils.isEmpty(props)) { + return false; + } + if (props.containsKey("hadoop.config.resources") || props.containsKey("hadoop.security.authentication")) { + return true; + } + return false; + } + private void loadFinalHdfsConfig(Map<String, String> origProps) { if (MapUtils.isEmpty(origProps)) { return; } finalHdfsConfig = new HashMap<>(); origProps.forEach((key, value) -> { - if (key.startsWith("hadoop.") || key.startsWith("dfs.")) { + if (key.startsWith("hadoop.") || key.startsWith("dfs.") || key.equals("fs.defaultFS")) { finalHdfsConfig.put(key, value); } }); @@ -98,12 +110,14 @@ public class HDFSProperties extends StorageProperties { super.checkRequiredProperties(); checkConfigFileIsValid(hadoopConfigResources); if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) { - if (Strings.isNullOrEmpty(hdfsKerberosPrincipal) - || Strings.isNullOrEmpty(hdfsKerberosKeytab)) { + if (Strings.isNullOrEmpty(hdfsKerberosPrincipal) || Strings.isNullOrEmpty(hdfsKerberosKeytab)) { throw new IllegalArgumentException("HDFS authentication type is kerberos, " + "but principal or keytab is not set."); } } + if (StringUtils.isBlank(fsDefaultFS)) { + this.fsDefaultFS = HdfsPropertiesUtils.constructDefaultFsFromUri(origProps); + } } private void checkConfigFileIsValid(String configFile) { @@ -119,6 +133,9 @@ public class HDFSProperties extends StorageProperties { if (MapUtils.isNotEmpty(finalHdfsConfig)) { finalHdfsConfig.forEach(conf::set); } + if (StringUtils.isNotBlank(fsDefaultFS)) { + conf.set("fs.defaultFS", fsDefaultFS); + } //todo waiting be support should use new params conf.set("hdfs.security.authentication", hdfsAuthenticationType); if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) { @@ -131,20 +148,20 @@ public class HDFSProperties extends StorageProperties { } public Configuration getHadoopConfiguration() { - Configuration conf = new Configuration(false); + Configuration conf = new Configuration(true); Map<String, String> allProps = loadConfigFromFile(getResourceConfigPropName()); allProps.forEach(conf::set); if (MapUtils.isNotEmpty(finalHdfsConfig)) { finalHdfsConfig.forEach(conf::set); } - conf.set("hdfs.security.authentication", hdfsAuthenticationType); + /* conf.set("hadoop.kerberos.authentication", hdfsAuthenticationType); if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) { conf.set("hadoop.kerberos.principal", hdfsKerberosPrincipal); conf.set("hadoop.kerberos.keytab", hdfsKerberosKeytab); } if (!Strings.isNullOrEmpty(hadoopUsername)) { conf.set("hadoop.username", hadoopUsername); - } + }*/ return conf; } @@ -163,12 +180,12 @@ public class HDFSProperties extends StorageProperties { @Override public String convertUrlToFilePath(String url) throws UserException { - throw new NotImplementedException("Support HDFS is not implemented"); + return HdfsPropertiesUtils.convertUrlToFilePath(url); } @Override public String checkLoadPropsAndReturnUri(Map<String, String> loadProps) throws UserException { - throw new NotImplementedException("Support HDFS is not implemented"); + return HdfsPropertiesUtils.checkLoadPropsAndReturnUri(loadProps); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java new file mode 100644 index 00000000000..f605c977ec1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.storage; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.URI; + +import org.apache.commons.lang3.StringUtils; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class HdfsPropertiesUtils { + private static final String URI_KEY = "uri"; + + private static Set<String> supportSchema = new HashSet<>(); + + static { + supportSchema.add("hdfs"); + supportSchema.add("viewfs"); + } + + public static String checkLoadPropsAndReturnUri(Map<String, String> props) throws UserException { + if (props.isEmpty()) { + throw new UserException("props is empty"); + } + if (!props.containsKey(URI_KEY)) { + throw new UserException("props must contain uri"); + } + String uriStr = props.get(URI_KEY); + return convertAndCheckUri(uriStr); + } + + + public static String convertUrlToFilePath(String uriStr) throws UserException { + + return convertAndCheckUri(uriStr); + } + + public static String constructDefaultFsFromUri(Map<String, String> props) { + if (props.isEmpty()) { + return null; + } + if (!props.containsKey(URI_KEY)) { + return null; + } + String uriStr = props.get(URI_KEY); + if (StringUtils.isBlank(uriStr)) { + return null; + } + URI uri = null; + try { + uri = URI.create(uriStr); + } catch (AnalysisException e) { + return null; + } + String schema = uri.getScheme(); + if (StringUtils.isBlank(schema)) { + throw new IllegalArgumentException("Invalid uri: " + uriStr + "extract schema is null"); + } + if (!supportSchema.contains(schema.toLowerCase())) { + throw new IllegalArgumentException("Invalid export path:" + + schema + " , please use valid 'hdfs://' or 'viewfs://' path."); + } + return uri.getScheme() + "://" + uri.getAuthority(); + } + + private static String convertAndCheckUri(String uriStr) throws AnalysisException { + if (StringUtils.isBlank(uriStr)) { + throw new IllegalArgumentException("uri is null, pls check your params"); + } + URI uri = URI.create(uriStr); + String schema = uri.getScheme(); + if (StringUtils.isBlank(schema)) { + throw new IllegalArgumentException("Invalid uri: " + uriStr + "extract schema is null"); + } + if (!supportSchema.contains(schema.toLowerCase())) { + throw new IllegalArgumentException("Invalid export path:" + + schema + " , please use valid 'hdfs://' or 'viewfs://' path."); + } + return uri.getScheme() + "://" + uri.getAuthority() + uri.getPath(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java index d17504ba3de..7435efab2da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java @@ -32,12 +32,12 @@ import java.util.Map; public class S3Properties extends AbstractObjectStorageProperties { - @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT", "access_key"}, + @ConnectorProperty(names = {"s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT"}, required = false, description = "The endpoint of S3.") protected String s3Endpoint = ""; - @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "region"}, + @ConnectorProperty(names = {"s3.region", "AWS_REGION", "region", "REGION"}, required = false, description = "The region of S3.") protected String s3Region = "us-east-1"; @@ -46,7 +46,7 @@ public class S3Properties extends AbstractObjectStorageProperties { description = "The access key of S3.") protected String s3AccessKey = ""; - @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", "secret_key"}, + @ConnectorProperty(names = {"s3.secret_key", "AWS_SECRET_KEY", "secret_key", "SECRET_KEY"}, description = "The secret key of S3.") protected String s3SecretKey = ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java index 555a878937f..44e940924b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java @@ -107,7 +107,7 @@ public abstract class StorageProperties extends ConnectionProperties { public static StorageProperties createStorageProperties(Map<String, String> origProps) { StorageProperties storageProperties = null; // 1. parse the storage properties by user specified fs.xxx.support properties - if (isFsSupport(origProps, FS_HDFS_SUPPORT)) { + if (isFsSupport(origProps, FS_HDFS_SUPPORT) || HDFSProperties.guessIsMe(origProps)) { storageProperties = new HDFSProperties(origProps); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 49023f5a989..bb1b352fc3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -68,13 +68,14 @@ public class DFSFileSystem extends RemoteFileSystem { super(StorageBackend.StorageType.HDFS.name(), StorageBackend.StorageType.HDFS); - this.properties.putAll(properties); + this.properties.putAll(hdfsProperties.getOrigProps()); + this.storageProperties = hdfsProperties; this.hdfsProperties = hdfsProperties; } public DFSFileSystem(HDFSProperties hdfsProperties, StorageBackend.StorageType storageType) { super(storageType.name(), storageType); - this.properties.putAll(properties); + this.properties.putAll(hdfsProperties.getOrigProps()); this.hdfsProperties = hdfsProperties; } @@ -222,7 +223,7 @@ public class DFSFileSystem extends RemoteFileSystem { * @throws IOException when read data error. */ private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length) - throws IOException { + throws IOException { synchronized (fsDataInputStream) { long currentStreamOffset; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java index 37984981486..5c4d9583296 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java @@ -18,15 +18,12 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.analysis.StorageBackend.StorageType; -import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.URI; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.thrift.TFileType; -import com.google.common.base.Strings; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -47,29 +44,16 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction { private void init(Map<String, String> properties) throws AnalysisException { // 1. analyze common properties - Map<String, String> otherProps = super.parseCommonProperties(properties); + Map<String, String> props = super.parseCommonProperties(properties); + this.storageProperties = StorageProperties.createStorageProperties(props); + locationProperties.putAll(storageProperties.getBackendConfigProperties()); // 2. analyze uri - String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null); - if (Strings.isNullOrEmpty(uriStr)) { - throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); - } - URI uri = URI.create(uriStr); - //fixme refactor in HDFSStorageProperties - StorageBackend.checkUri(uri, StorageType.HDFS); - filePath = uri.getScheme() + "://" + uri.getAuthority() + uri.getPath(); - - // 3. analyze other properties - for (String key : otherProps.keySet()) { - if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) { - locationProperties.put(HdfsResource.HADOOP_FS_NAME, otherProps.get(key)); - } else { - locationProperties.put(key, otherProps.get(key)); - } - } - // If the user does not specify the HADOOP_FS_NAME, we will use the uri's scheme and authority - if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) { - locationProperties.put(HdfsResource.HADOOP_FS_NAME, uri.getScheme() + "://" + uri.getAuthority()); + try { + String uri = storageProperties.checkLoadPropsAndReturnUri(props); + filePath = storageProperties.convertUrlToFilePath(uri); + } catch (UserException e) { + throw new AnalysisException("Failed check storage props", e); } if (!FeConstants.runningUnitTest) { @@ -92,7 +76,7 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction { @Override public BrokerDesc getBrokerDesc() { - return new BrokerDesc("HdfsTvfBroker", StorageType.HDFS, locationProperties); + return new BrokerDesc("HdfsTvfBroker", locationProperties, storageProperties); } // =========== implement abstract methods of TableValuedFunctionIf ================= diff --git a/regression-test/suites/refactor_storage_param_p0/hdfs_all_test.groovy b/regression-test/suites/refactor_storage_param_p0/hdfs_all_test.groovy new file mode 100644 index 00000000000..bddefc51d1d --- /dev/null +++ b/regression-test/suites/refactor_storage_param_p0/hdfs_all_test.groovy @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import org.awaitility.Awaitility; +import static java.util.concurrent.TimeUnit.SECONDS; +import static groovy.test.GroovyAssert.shouldFail + +suite("refactor_params_hdfs_all_test", "p0,external,kerberos,external_docker,external_docker_kerberos") { + String enabled = context.config.otherConfigs.get("enableRefactorParamsHdfsTest") + if (enabled == null || enabled.equalsIgnoreCase("false")) { + return + } + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def table = "hdfs_all_test"; + + def databaseQueryResult = sql """ + select database(); + """ + println databaseQueryResult + def currentDBName = databaseQueryResult.get(0).get(0) + println currentDBName + // cos + + def createDBAndTbl = { String dbName -> + + sql """ + drop database if exists ${dbName} + """ + + sql """ + create database ${dbName} + """ + + sql """ + use ${dbName} + """ + sql """ + CREATE TABLE ${table}( + user_id BIGINT NOT NULL COMMENT "user id", + name VARCHAR(20) COMMENT "name", + age INT COMMENT "age" + ) + DUPLICATE KEY(user_id) + DISTRIBUTED BY HASH(user_id) BUCKETS 10 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """ + insert into ${table} values (1, 'a', 10); + """ + + def insertResult = sql """ + SELECT count(1) FROM ${table} + """ + + println "insertResult: ${insertResult}" + + assert insertResult.get(0).get(0) == 1 + } + + def hdfsNonXmlParams = "\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:8520\",\n" + + "\"hadoop.kerberos.min.seconds.before.relogin\" = \"5\",\n" + + "\"hadoop.security.authentication\" = \"kerberos\",\n" + + "\"hadoop.kerberos.principal\"=\"hive/presto-master.docker.clus...@labs.teradata.com\",\n" + + "\"hadoop.kerberos.keytab\" = \"/mnt/disk1/gq/keytabs/keytabs/hive-presto-master.keytab\",\n" + + "\"hive.metastore.sasl.enabled \" = \"true\",\n" + + "\"hadoop.security.auth_to_local\" = \"RULE:[2:\\\$1@\\\$0](.*@LABS.TERADATA.COM)s/@.*//\n" + + " RULE:[2:\\\$1@\\\$0](.*@OTHERLABS.TERADATA.COM)s/@.*//\n" + + " RULE:[2:\\\$1@\\\$0](.*@OTHERREALM.COM)s/@.*//\n" + + " DEFAULT\"" + + def createRepository = { String repoName, String location, String hdfsParams -> + try { + sql """ + drop repository ${repoName}; + """ + } catch (Exception e) { + // ignore exception, repo may not exist + } + + sql """ + CREATE REPOSITORY ${repoName} + WITH HDFS + ON LOCATION "${location}" + PROPERTIES ( + ${hdfsParams} + ); + """ + } + + def backupAndRestore = { String repoName, String dbName, String tableName, String backupLabel -> + sql """ + BACKUP SNAPSHOT ${dbName}.${backupLabel} + TO ${repoName} + ON (${tableName}) + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until( + { + def backupResult = sql """ + show backup from ${dbName} where SnapshotName = '${backupLabel}'; + """ + println "backupResult: ${backupResult}" + return backupResult.get(0).get(3) == "FINISHED" + }) + + def querySnapshotResult = sql """ + SHOW SNAPSHOT ON ${repoName} WHERE SNAPSHOT = '${backupLabel}'; + """ + println querySnapshotResult + def snapshotTimes = querySnapshotResult.get(0).get(1).split('\n') + def snapshotTime = snapshotTimes[0] + + sql """ + drop table if exists ${tableName}; + """ + + sql """ + RESTORE SNAPSHOT ${dbName}.${backupLabel} + FROM ${repoName} + ON (`${tableName}`) + PROPERTIES + ( + "backup_timestamp"="${snapshotTime}", + "replication_num" = "1" + ); + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until( + { + try { + + sql """ + use ${dbName} + """ + def restoreResult = sql """ + SELECT count(1) FROM ${tableName} + """ + println "restoreResult: ${restoreResult}" + def count = restoreResult.get(0).get(0) + println "count: ${count}" + return restoreResult.get(0).get(0) == 1 + } catch (Exception e) { + // tbl not found + println "tbl not found" + e.getMessage() + return false + } + }) + } + def hdfs_tvf = { filePath, hdfsParam -> + + def hdfs_tvf_sql = sql """ + select * from hdfs + + ( + 'uri'='${filePath}', + "format" = "csv", + ${hdfsParam} + ); + """ + } + def export_hdfs = { defaultFs, hdfsParams -> + def exportPath = defaultFs + "/test/_export/" + System.currentTimeMillis() + def exportLabel = "export_" + System.currentTimeMillis(); + sql """ + EXPORT TABLE ${table} + TO "${exportPath}" + PROPERTIES + ( + "label"="${exportLabel}", + "line_delimiter" = "," + ) + with HDFS + ( + + ${hdfsParams} + ); + """ + + databaseQueryResult = sql """ + select database(); + """ + currentDBName = databaseQueryResult.get(0).get(0) + Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until({ + def exportResult = sql """ + SHOW EXPORT FROM ${currentDBName} WHERE LABEL = "${exportLabel}"; + + """ + + println exportResult + + if (null == exportResult || exportResult.isEmpty() || null == exportResult.get(0) || exportResult.get(0).size() < 3) { + return false; + } + if (exportResult.get(0).get(2) == 'CANCELLED' || exportResult.get(0).get(2) == 'FAILED') { + throw new RuntimeException("load failed") + } + + return exportResult.get(0).get(2) == 'FINISHED' + }) + + } + def outfile_to_hdfs = { defaultFs, hdfsParams -> + def outFilePath = "${defaultFs}/outfile_different_hdfs/exp_" + // select ... into outfile ... + def res = sql """ + SELECT * FROM ${table} ORDER BY user_id + INTO OUTFILE "${outFilePath}" + FORMAT AS CSV + PROPERTIES ( + ${hdfsParams} + ); + """ + return res[0][3] + } + def hdfsLoad = { filePath, hdfsParams -> + databaseQueryResult = sql """ + select database(); + """ + println databaseQueryResult + def dataCountResult = sql """ + SELECT count(*) FROM ${table} + """ + def dataCount = dataCountResult[0][0] + def label = "hdfs_load_label_" + System.currentTimeMillis() + def load = sql """ + LOAD LABEL `${label}` ( + data infile ("${filePath}") + into table ${table} + COLUMNS TERMINATED BY "\\\t" + FORMAT AS "CSV" + ( + user_id, + name, + age + )) + with hdfs + ( + ${hdfsParams} + ) + PROPERTIES + ( + "timeout" = "3600" + ); + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until({ + def loadResult = sql """ + show load where label = '${label}'; + """ + println 'test' + println loadResult + + if (null == loadResult || loadResult.isEmpty() || null == loadResult.get(0) || loadResult.get(0).size() < 3) { + return false; + } + if (loadResult.get(0).get(2) == 'CANCELLED' || loadResult.get(0).get(2) == 'FAILED') { + throw new RuntimeException("load failed") + } + + return loadResult.get(0).get(2) == 'FINISHED' + }) + + + def expectedCount = dataCount + 1 + Awaitility.await().atMost(60, SECONDS).pollInterval(5, SECONDS).until({ + def loadResult = sql """ + select count(*) from ${table} + """ + println "loadResult: ${loadResult} " + return loadResult.get(0).get(0) == expectedCount + }) + + } + def defaultFs = 'hdfs://172.20.32.136:8520' + def repoName = 'hdfs_non_xml_repo'; + // create repo + createRepository(repoName,"${defaultFs}/test_repo",hdfsNonXmlParams); + def dbName1 = currentDBName + "${repoName}_1" + createDBAndTbl(dbName1) + def backupLabel=repoName+System.currentTimeMillis() + //backup and restore + backupAndRestore(repoName,dbName1,table,backupLabel) + def failedRepoName='failedRepo' + shouldFail { + createRepository(failedRepoName,"s3://172.20.32.136:8520",hdfsNonXmlParams); + } + shouldFail { + createRepository(failedRepoName," ",hdfsNonXmlParams); + } + + //outfile + dbName1 = currentDBName + 'outfile_test_1' + createDBAndTbl(dbName1) + def outfile = outfile_to_hdfs(defaultFs, hdfsNonXmlParams); + println outfile + //hdfs tvf + def hdfsTvfResult = hdfs_tvf(outfile, hdfsNonXmlParams) + println hdfsTvfResult + + //hdfsLoad(outfile,hdfsNonXmlParams) + + //export + export_hdfs(defaultFs, hdfsNonXmlParams) + + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org