This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 1adb4cfdd9 [Enhancement](tvf) Table value function support reading local file (#22915) 1adb4cfdd9 is described below commit 1adb4cfdd954648b9aae96f4c91a271422fb711c Author: Mingyu Chen <morning...@163.com> AuthorDate: Sun Aug 13 19:39:16 2023 +0800 [Enhancement](tvf) Table value function support reading local file (#22915) cherry pick #17404 --- be/src/io/fs/err_utils.cpp | 20 +++ be/src/io/fs/err_utils.h | 1 + be/src/io/fs/local_file_system.cpp | 50 +++++++ be/src/io/fs/local_file_system.h | 11 ++ be/src/olap/tablet_schema_cache.cpp | 1 - be/src/service/internal_service.cpp | 22 +++ be/src/service/internal_service.h | 3 + be/test/io/fs/local_file_system_test.cpp | 30 +++++ docs/en/docs/admin-manual/config/be-config.md | 5 + .../sql-functions/table-functions/local.md | 150 +++++++++++++++++++++ docs/sidebars.json | 1 + docs/zh-CN/docs/admin-manual/config/be-config.md | 5 + .../sql-functions/table-functions/local.md | 147 ++++++++++++++++++++ .../doris/analysis/TableValuedFunctionRef.java | 8 +- .../doris/planner/external/FileQueryScanNode.java | 4 +- .../apache/doris/planner/external/TVFScanNode.java | 19 +++ .../org/apache/doris/rpc/BackendServiceClient.java | 4 + .../org/apache/doris/rpc/BackendServiceProxy.java | 12 ++ .../ExternalFileTableValuedFunction.java | 21 +-- .../tablefunction/LocalTableValuedFunction.java | 145 ++++++++++++++++++++ .../doris/tablefunction/TableValuedFunctionIf.java | 2 + gensrc/proto/internal_service.proto | 14 ++ .../external_table_p0/tvf/test_local_tvf.groovy | 67 +++++++++ 23 files changed, 729 insertions(+), 13 deletions(-) diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp index 5530b25a0d..648a358850 100644 --- a/be/src/io/fs/err_utils.cpp +++ b/be/src/io/fs/err_utils.cpp @@ -53,5 +53,25 @@ std::string hdfs_error() { return ss.str(); } +std::string glob_err_to_str(int code) { + std::string msg; + // https://sites.uclouvain.be/SystInfo/usr/include/glob.h.html + switch (code) { + case 1: + msg = "Ran out of memory"; + break; + case 2: + msg = "read error"; + break; + case 3: + msg = "No matches found"; + break; + default: + msg = "unknown"; + break; + } + return fmt::format("({}), {}", code, msg); +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/err_utils.h b/be/src/io/fs/err_utils.h index 31ca702c32..971596fab1 100644 --- a/be/src/io/fs/err_utils.h +++ b/be/src/io/fs/err_utils.h @@ -26,6 +26,7 @@ namespace io { std::string errno_to_str(); std::string errcode_to_str(const std::error_code& ec); std::string hdfs_error(); +std::string glob_err_to_str(int code); } // namespace io } // namespace doris diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 01cd8829dd..48c1981202 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -19,6 +19,7 @@ #include <fcntl.h> #include <fmt/format.h> +#include <glob.h> #include <glog/logging.h> #include <openssl/md5.h> #include <sys/mman.h> @@ -428,5 +429,54 @@ const std::shared_ptr<LocalFileSystem>& global_local_filesystem() { return local_fs; } +Status LocalFileSystem::canonicalize_local_file(const std::string& dir, + const std::string& file_path, + std::string* full_path) { + const std::string absolute_path = dir + "/" + file_path; + std::string canonical_path; + RETURN_IF_ERROR(canonicalize(absolute_path, &canonical_path)); + if (!contain_path(dir, canonical_path)) { + return Status::InvalidArgument("file path is not allowed: {}", canonical_path); + } + + *full_path = canonical_path; + return Status::OK(); +} + +Status LocalFileSystem::safe_glob(const std::string& path, std::vector<FileInfo>* res) { + if (path.find("..") != std::string::npos) { + return Status::InvalidArgument("can not contain '..' in path"); + } + std::string full_path = config::user_files_secure_path + "/" + path; + std::vector<std::string> files; + RETURN_IF_ERROR(_glob(full_path, &files)); + for (auto& file : files) { + FileInfo fi; + fi.is_file = true; + RETURN_IF_ERROR(canonicalize_local_file("", file, &(fi.file_name))); + RETURN_IF_ERROR(file_size_impl(fi.file_name, &(fi.file_size))); + res->push_back(std::move(fi)); + } + return Status::OK(); +} + +Status LocalFileSystem::_glob(const std::string& pattern, std::vector<std::string>* res) { + glob_t glob_result; + memset(&glob_result, 0, sizeof(glob_result)); + + int rc = glob(pattern.c_str(), GLOB_TILDE, NULL, &glob_result); + if (rc != 0) { + globfree(&glob_result); + return Status::IOError("failed to glob {}: {}", pattern, glob_err_to_str(rc)); + } + + for (size_t i = 0; i < glob_result.gl_pathc; ++i) { + res->push_back(std::string(glob_result.gl_pathv[i])); + } + + globfree(&glob_result); + return Status::OK(); +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index d9c0ec96c8..1f8d35c096 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -72,6 +72,15 @@ public: // read local file and save content to "content" Status read_file_to_string(const Path& file, std::string* content); + Status canonicalize_local_file(const std::string& dir, const std::string& file_path, + std::string* full_path); + + // glob list the files match the path pattern. + // the result will be saved in "res", in absolute path with file size. + // "safe" means the path will be concat with the path prefix config::user_files_secure_path, + // so that it can not list any files outside the config::user_files_secure_path + Status safe_glob(const std::string& path, std::vector<FileInfo>* res); + protected: Status create_file_impl(const Path& file, FileWriterPtr* writer) override; Status open_file_impl(const FileDescription& file_desc, const Path& abs_path, @@ -97,6 +106,8 @@ protected: Status delete_directory_or_file_impl(const Path& path); private: + // a wrapper for glob(), return file list in "res" + Status _glob(const std::string& pattern, std::vector<std::string>* res); LocalFileSystem(Path&& root_path, std::string&& id = ""); }; diff --git a/be/src/olap/tablet_schema_cache.cpp b/be/src/olap/tablet_schema_cache.cpp index ee14358495..e14c3f7ecc 100644 --- a/be/src/olap/tablet_schema_cache.cpp +++ b/be/src/olap/tablet_schema_cache.cpp @@ -74,7 +74,6 @@ void TabletSchemaCache::_recycle() { } } _is_stopped = true; - LOG(INFO) << "xxx yyy stopped "; } } // namespace doris diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 3a5aa06125..1bbecfca4e 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -55,6 +55,7 @@ #include "common/status.h" #include "gutil/integral_types.h" #include "http/http_client.h" +#include "io/fs/local_file_system.h" #include "io/fs/stream_load_pipe.h" #include "io/io_common.h" #include "olap/data_dir.h" @@ -1608,4 +1609,25 @@ void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcContr ExecEnv::GetInstance()->storage_engine()->get_tablet_rowset_versions(request, response); } +void PInternalServiceImpl::glob(google::protobuf::RpcController* controller, + const PGlobRequest* request, PGlobResponse* response, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, response, done]() { + brpc::ClosureGuard closure_guard(done); + std::vector<io::FileInfo> files; + Status st = io::global_local_filesystem()->safe_glob(request->pattern(), &files); + if (st.ok()) { + for (auto& file : files) { + PGlobResponse_PFileInfo* pfile = response->add_files(); + pfile->set_file(file.file_name); + pfile->set_size(file.file_size); + } + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _heavy_work_pool); + } +} + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index aa30959ca3..47762cf7e5 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -181,6 +181,9 @@ public: PGetTabletVersionsResponse* response, google::protobuf::Closure* done) override; + void glob(google::protobuf::RpcController* controller, const PGlobRequest* request, + PGlobResponse* response, google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/test/io/fs/local_file_system_test.cpp b/be/test/io/fs/local_file_system_test.cpp index ea452782b2..953d7669b8 100644 --- a/be/test/io/fs/local_file_system_test.cpp +++ b/be/test/io/fs/local_file_system_test.cpp @@ -610,4 +610,34 @@ TEST_F(LocalFileSystemTest, TestRandomWrite) { EXPECT_TRUE(file_reader->close().ok()); } } + +TEST_F(LocalFileSystemTest, TestGlob) { + std::string path = "./be/ut_build_ASAN/test/file_path/"; + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok()); + EXPECT_TRUE(io::global_local_filesystem() + ->create_directory("./be/ut_build_ASAN/test/file_path/1") + .ok()); + EXPECT_TRUE(io::global_local_filesystem() + ->create_directory("./be/ut_build_ASAN/test/file_path/2") + .ok()); + EXPECT_TRUE(io::global_local_filesystem() + ->create_directory("./be/ut_build_ASAN/test/file_path/3") + .ok()); + + save_string_file("./be/ut_build_ASAN/test/file_path/1/f1.txt", "just test"); + save_string_file("./be/ut_build_ASAN/test/file_path/1/f2.txt", "just test"); + save_string_file("./be/ut_build_ASAN/test/file_path/f3.txt", "just test"); + + std::vector<io::FileInfo> files; + EXPECT_FALSE(io::global_local_filesystem()->safe_glob("./../*.txt", &files).ok()); + EXPECT_FALSE(io::global_local_filesystem()->safe_glob("/*.txt", &files).ok()); + EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/1/*.txt", &files).ok()); + EXPECT_EQ(2, files.size()); + files.clear(); + EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/*/*.txt", &files).ok()); + EXPECT_EQ(2, files.size()); + + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok()); +} + } // namespace doris diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 601577f252..88a044fdfd 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1465,3 +1465,8 @@ Indicates how many tablets failed to load in the data directory. At the same tim * Description: If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled. If false, cancel query when the memory used exceeds exec_mem_limit. * Default value: true + +#### `user_files_secure_path` + +* Description: The storage directory for files queried by `local` table valued functions. +* Default value: `${DORIS_HOME}` \ No newline at end of file diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/local.md b/docs/en/docs/sql-manual/sql-functions/table-functions/local.md new file mode 100644 index 0000000000..ab3be7d4ae --- /dev/null +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/local.md @@ -0,0 +1,150 @@ +--- +{ + "title": "local", + "language": "en" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## Local + +### Name + +<version since="dev"> + +local + +</version> + +### Description + +Local table-valued-function(tvf), allows users to read and access local file contents on be node, just like accessing relational table. Currently supports `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` file format. + +It needs `ADMIN` privilege to use. + +#### syntax + +```sql +local( + "file_path" = "path/to/file.txt", + "backend_id" = "be_id", + "format" = "csv", + "keyn" = "valuen" + ... + ); +``` + +**parameter description** + +Related parameters for accessing local file on be node: + +- `file_path`: + + (required) The path of the file to be read, which is a relative path to the `user_files_secure_path` directory, where `user_files_secure_path` parameter [can be configured on be](../../../admin-manual/config/be-config.md). + + Can not contains `..` in path. Support using glob syntax to match multi files, such as `log/*.log` + +- `backend_id`: + + (required) The backend id where the file resides. The `backend_id` can be obtained by `show backends` command. + +File format parameters: + +- `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` +- `column_separator`: (optional) default `,`. +- `line_delimiter`: (optional) default `\n`. +- `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`. + + The following 6 parameters are used for loading in json format. For specific usage methods, please refer to: [Json Load](../../../data-operate/import/import-way/load-json-format.md) + +- `read_json_by_line`: (optional) default `"true"` +- `strip_outer_array`: (optional) default `"false"` +- `json_root`: (optional) default `""` +- `json_paths`: (optional) default `""` +- `num_as_string`: (optional) default `false` +- `fuzzy_parse`: (optional) default `false` + + <version since="dev">The following 2 parameters are used for loading in csv format</version> + +- `trim_double_quotes`: Boolean type (optional), the default value is `false`. True means that the outermost double quotes of each field in the csv file are trimmed. +- `skip_lines`: Integer type (optional), the default value is 0. It will skip some lines in the head of csv file. It will be disabled when the format is `csv_with_names` or `csv_with_names_and_types`. + +### Examples + +Analyze the log file on specified BE: + +```sql +mysql> select * from local( + "file_path" = "log/be.out", + "backend_id" = "10006", + "format" = "csv") + where c1 like "%start_time%" limit 10; ++--------------------------------------------------------+ +| c1 | ++--------------------------------------------------------+ +| start time: 2023年 08月 07日 星期一 23:20:32 CST | +| start time: 2023年 08月 07日 星期一 23:32:10 CST | +| start time: 2023年 08月 08日 星期二 00:20:50 CST | +| start time: 2023年 08月 08日 星期二 00:29:15 CST | ++--------------------------------------------------------+ +``` + +Read and access csv format files located at path `${DORIS_HOME}/student.csv`: + +```sql +mysql> select * from local( + "file_path" = "student.csv", + "backend_id" = "10003", + "format" = "csv"); ++------+---------+--------+ +| c1 | c2 | c3 | ++------+---------+--------+ +| 1 | alice | 18 | +| 2 | bob | 20 | +| 3 | jack | 24 | +| 4 | jackson | 19 | +| 5 | liming | d18 | ++------+---------+--------+ +``` + +Can be used with `desc function` : + +```sql +mysql> desc function local( + "file_path" = "student.csv", + "backend_id" = "10003", + "format" = "csv"); ++-------+------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++-------+------+------+-------+---------+-------+ +| c1 | TEXT | Yes | false | NULL | NONE | +| c2 | TEXT | Yes | false | NULL | NONE | +| c3 | TEXT | Yes | false | NULL | NONE | ++-------+------+------+-------+---------+-------+ +``` + +### Keywords + + local, table-valued-function, tvf + +### Best Practice + + For more detailed usage of local tvf, please refer to [S3](./s3.md) tvf, The only difference between them is the way of accessing the storage system. diff --git a/docs/sidebars.json b/docs/sidebars.json index 52757c8f8d..a03377253e 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -700,6 +700,7 @@ "sql-manual/sql-functions/table-functions/s3", "sql-manual/sql-functions/table-functions/hdfs", "sql-manual/sql-functions/table-functions/iceberg_meta", + "sql-manual/sql-functions/table-functions/local", "sql-manual/sql-functions/table-functions/backends", "sql-manual/sql-functions/table-functions/frontends", "sql-manual/sql-functions/table-functions/workload-group", diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index 291c711470..5e48d603f6 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1494,3 +1494,8 @@ load tablets from header failed, failed tablets size: xxx, path=xxx * 描述: 如果为true,则当内存未超过 exec_mem_limit 时,查询内存将不受限制;当进程内存超过 exec_mem_limit 且大于 2GB 时,查询会被取消。如果为false,则在使用的内存超过 exec_mem_limit 时取消查询。 * 默认值: true + +#### `user_files_secure_path` + +* 描述: `local` 表函数查询的文件的存储目录。 +* 默认值: `${DORIS_HOME}` \ No newline at end of file diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md new file mode 100644 index 0000000000..e4ed638017 --- /dev/null +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md @@ -0,0 +1,147 @@ +--- +{ + "title": "local", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## local + +### Name + +<version since="dev"> + +local + +</version> + +### Description + +Local表函数(table-valued-function,tvf),可以让用户像访问关系表格式数据一样,读取并访问 be 上的文件内容。目前支持`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`文件格式。 + +该函数需要 ADMIN 权限。 + +#### syntax +```sql +local( + "file_path" = "path/to/file.txt", + "backend_id" = "be_id", + "format" = "csv", + "keyn" = "valuen" + ... + ); +``` + +**参数说明** + +访问local文件的相关参数: +- `file_path` + + (必填)待读取文件的路径,该路径是一个相对于 `user_files_secure_path` 目录的相对路径, 其中 `user_files_secure_path` 参数是 [be的一个配置项](../../../admin-manual/config/be-config.md) 。 + + 路径中不能包含 `..`,可以使用 glob 语法进行模糊匹配,如:`logs/*.log` + +- `backend_id`: + + (必填)文件所在的 be id。 `backend_id` 可以通过 `show backends` 命令得到。 + +文件格式相关参数 +- `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` +- `column_separator`:(选填) 列分割符, 默认为`,`。 +- `line_delimiter`:(选填) 行分割符,默认为`\n`。 +- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。 + + 下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json Load](../../../data-operate/import/import-way/load-json-format.md) + +- `read_json_by_line`: (选填) 默认为 `"true"` +- `strip_outer_array`: (选填) 默认为 `"false"` +- `json_root`: (选填) 默认为空 +- `json_paths`: (选填) 默认为空 +- `num_as_string`: (选填) 默认为 `false` +- `fuzzy_parse`: (选填) 默认为 `false` + + <version since="dev">下面2个参数是用于csv格式的导入</version> + +- `trim_double_quotes`: 布尔类型,选填,默认值为 `false`,为 `true` 时表示裁剪掉 csv 文件每个字段最外层的双引号 +- `skip_lines`: 整数类型,选填,默认值为0,含义为跳过csv文件的前几行。当设置format设置为 `csv_with_names` 或 `csv_with_names_and_types` 时,该参数会失效 + +### Examples + +分析指定 BE 上的日志文件: + +```sql +mysql> select * from local( + "file_path" = "log/be.out", + "backend_id" = "10006", + "format" = "csv") + where c1 like "%start_time%" limit 10; ++--------------------------------------------------------+ +| c1 | ++--------------------------------------------------------+ +| start time: 2023年 08月 07日 星期一 23:20:32 CST | +| start time: 2023年 08月 07日 星期一 23:32:10 CST | +| start time: 2023年 08月 08日 星期二 00:20:50 CST | +| start time: 2023年 08月 08日 星期二 00:29:15 CST | ++--------------------------------------------------------+ +``` + +读取和访问位于路径`${DORIS_HOME}/student.csv`的 csv格式文件: + +```sql +mysql> select * from local( + "file_path" = "student.csv", + "backend_id" = "10003", + "format" = "csv"); ++------+---------+--------+ +| c1 | c2 | c3 | ++------+---------+--------+ +| 1 | alice | 18 | +| 2 | bob | 20 | +| 3 | jack | 24 | +| 4 | jackson | 19 | +| 5 | liming | d18 | ++------+---------+--------+ +``` + +可以配合`desc function`使用 + +```sql +mysql> desc function local( + "file_path" = "student.csv", + "backend_id" = "10003", + "format" = "csv"); ++-------+------+------+-------+---------+-------+ +| Field | Type | Null | Key | Default | Extra | ++-------+------+------+-------+---------+-------+ +| c1 | TEXT | Yes | false | NULL | NONE | +| c2 | TEXT | Yes | false | NULL | NONE | +| c3 | TEXT | Yes | false | NULL | NONE | ++-------+------+------+-------+---------+-------+ +``` + +### Keywords + + local, table-valued-function, tvf + +### Best Practice + + 关于local tvf的更详细使用方法可以参照 [S3](./s3.md) tvf, 唯一不同的是访问存储系统的方式不一样。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java index ba1b07eb4c..166b3297ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java @@ -27,6 +27,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.tablefunction.BackendsTableValuedFunction; +import org.apache.doris.tablefunction.LocalTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; import java.util.Map; @@ -103,11 +104,12 @@ public class TableValuedFunctionRef extends TableRef { return; } - // check privilige for backends tvf - if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)) { + // check privilige for backends/local tvf + if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME) + || funcName.equalsIgnoreCase(LocalTableValuedFunction.NAME)) { if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN) && !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), - PrivPredicate.OPERATOR)) { + PrivPredicate.OPERATOR)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN/OPERATOR"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 586baa58d0..db8f835abb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -364,7 +364,8 @@ public abstract class FileQueryScanNode extends FileScanNode { params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); } } - } else if (locationType == TFileType.FILE_S3 && !params.isSetProperties()) { + } else if ((locationType == TFileType.FILE_S3 || locationType == TFileType.FILE_LOCAL) + && !params.isSetProperties()) { params.setProperties(locationProperties); } @@ -405,6 +406,7 @@ public abstract class FileQueryScanNode extends FileScanNode { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); } else if (locationType == TFileType.FILE_S3 || locationType == TFileType.FILE_BROKER + || locationType == TFileType.FILE_LOCAL || locationType == TFileType.FILE_NET) { // need full path rangeDesc.setPath(fileSplit.getPath().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java index 476e16b098..d895856401 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -18,6 +18,7 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FunctionGenTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; @@ -27,7 +28,9 @@ import org.apache.doris.common.util.Util; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.system.Backend; import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.tablefunction.LocalTableValuedFunction; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileCompressType; @@ -40,6 +43,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -62,6 +66,21 @@ public class TVFScanNode extends FileQueryScanNode { } @Override + protected void initBackendPolicy() throws UserException { + List<String> preferLocations = new ArrayList<>(); + if (tableValuedFunction instanceof LocalTableValuedFunction) { + // For local tvf, the backend was specified by backendId + Long backendId = ((LocalTableValuedFunction) tableValuedFunction).getBackendId(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + if (backend == null) { + throw new UserException("Backend " + backendId + " does not exist"); + } + preferLocations.add(backend.getHost()); + } + backendPolicy.init(preferLocations); + numNodes = backendPolicy.numBackends(); + } + protected String getFsName(FileSplit split) { return tableValuedFunction.getFsName(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 484f6e77ed..8df33927fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -142,6 +142,10 @@ public class BackendServiceClient { return stub.getColumnIdsByTabletIds(request); } + public Future<InternalService.PGlobResponse> glob(InternalService.PGlobRequest request) { + return stub.glob(request); + } + public void shutdown() { if (!channel.isShutdown()) { channel.shutdown(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 39dfd7915f..5fc3ef2815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -387,4 +387,16 @@ public class BackendServiceProxy { } } + public Future<InternalService.PGlobResponse> glob(TNetworkAddress address, + InternalService.PGlobRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.glob(request); + } catch (Throwable e) { + LOG.warn("failed to glob dir from BE {}:{}, path: {}, error: ", + address.getHostname(), address.getPort(), request.getPattern()); + throw new RpcException(address.hostname, e.getMessage()); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 7e1a4b6698..035d54c0d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.PrimitiveType; @@ -347,18 +348,13 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return columns; } // get one BE address - TNetworkAddress address = null; columns = Lists.newArrayList(); - for (Backend be : org.apache.doris.catalog.Env.getCurrentSystemInfo().getIdToBackend().values()) { - if (be.isAlive()) { - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - break; - } - } - if (address == null) { + Backend be = getBackend(); + if (be == null) { throw new AnalysisException("No Alive backends"); } + TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); try { PFetchTableSchemaRequest request = getFetchTableStructureRequest(); Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance() @@ -390,6 +386,15 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return columns; } + protected Backend getBackend() { + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + return be; + } + } + return null; + } + /** * Convert PTypeDesc into doris column type * @param typeNodes list PTypeNodes in PTypeDesc diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java new file mode 100644 index 0000000000..f6693317ba --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PGlobResponse; +import org.apache.doris.proto.InternalService.PGlobResponse.PFileInfo; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TNetworkAddress; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.commons.collections.map.CaseInsensitiveMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * The implement of table valued function + * local("file_path" = "path/to/file.txt", "backend_id" = "be_id"). + */ +public class LocalTableValuedFunction extends ExternalFileTableValuedFunction { + private static final Logger LOG = LogManager.getLogger(LocalTableValuedFunction.class); + + public static final String NAME = "local"; + public static final String FILE_PATH = "file_path"; + public static final String BACKEND_ID = "backend_id"; + + private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>() + .add(FILE_PATH) + .add(BACKEND_ID) + .build(); + + private String filePath; + private long backendId; + + public LocalTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> fileFormatParams = new CaseInsensitiveMap(); + locationProperties = Maps.newHashMap(); + for (String key : params.keySet()) { + if (FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) { + fileFormatParams.put(key, params.get(key)); + } else if (LOCATION_PROPERTIES.contains(key.toLowerCase())) { + locationProperties.put(key.toLowerCase(), params.get(key)); + } else { + throw new AnalysisException(key + " is invalid property"); + } + } + + if (!locationProperties.containsKey(FILE_PATH)) { + throw new AnalysisException(String.format("Configuration '%s' is required.", FILE_PATH)); + } + if (!locationProperties.containsKey(BACKEND_ID)) { + throw new AnalysisException(String.format("Configuration '%s' is required.", BACKEND_ID)); + } + + filePath = locationProperties.get(FILE_PATH); + backendId = Long.parseLong(locationProperties.get(BACKEND_ID)); + parseProperties(fileFormatParams); + + getFileListFromBackend(); + } + + private void getFileListFromBackend() throws AnalysisException { + Backend be = Env.getCurrentSystemInfo().getBackend(backendId); + if (be == null) { + throw new AnalysisException("backend not found with backend_id = " + backendId); + } + + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + TNetworkAddress address = be.getBrpcAdress(); + InternalService.PGlobRequest.Builder requestBuilder = InternalService.PGlobRequest.newBuilder(); + requestBuilder.setPattern(filePath); + try { + Future<PGlobResponse> response = proxy.glob(address, requestBuilder.build()); + PGlobResponse globResponse = response.get(5, TimeUnit.SECONDS); + if (globResponse.getStatus().getStatusCode() != 0) { + throw new AnalysisException( + "error code: " + globResponse.getStatus().getStatusCode() + + ", error msg: " + globResponse.getStatus().getErrorMsgsList()); + } + for (PFileInfo file : globResponse.getFilesList()) { + fileStatuses.add(new TBrokerFileStatus(file.getFile().trim(), false, file.getSize(), true)); + LOG.info("get file from backend success. file: {}, size: {}", file.getFile(), file.getSize()); + } + } catch (Exception e) { + throw new AnalysisException("get file list from backend failed. " + e.getMessage()); + } + } + + @Override + public TFileType getTFileType() { + return TFileType.FILE_LOCAL; + } + + @Override + public String getFilePath() { + return filePath; + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("LocalTvfBroker", StorageType.LOCAL, locationProperties); + } + + @Override + public String getTableName() { + return "LocalTableValuedFunction"; + } + + public Long getBackendId() { + return backendId; + } + + @Override + protected Backend getBackend() { + return Env.getCurrentSystemInfo().getBackend(backendId); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index ea135b8b1b..2c67178494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -49,6 +49,8 @@ public abstract class TableValuedFunctionIf { return new S3TableValuedFunction(params); case HdfsTableValuedFunction.NAME: return new HdfsTableValuedFunction(params); + case LocalTableValuedFunction.NAME: + return new LocalTableValuedFunction(params); case IcebergTableValuedFunction.NAME: return new IcebergTableValuedFunction(params); case BackendsTableValuedFunction.NAME: diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index de1f7876dc..8b698b9948 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -670,6 +670,19 @@ message PGetTabletVersionsResponse { repeated PVersion versions = 2; }; +message PGlobRequest { + optional string pattern = 1; +} + +message PGlobResponse { + message PFileInfo { + optional string file = 1; + optional int64 size = 2; + }; + required PStatus status = 1; + repeated PFileInfo files = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -707,5 +720,6 @@ service PBackendService { rpc tablet_fetch_data(PTabletKeyLookupRequest) returns (PTabletKeyLookupResponse); rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns (PFetchColIdsResponse); rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns (PGetTabletVersionsResponse); + rpc glob(PGlobRequest) returns (PGlobResponse); }; diff --git a/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy new file mode 100644 index 0000000000..48782c4ad6 --- /dev/null +++ b/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This suit test the `backends` tvf +suite("test_local_tvf") { + List<List<Object>> table = sql """ select * from backends(); """ + assertTrue(table.size() > 0) + def be_id = table[0][0] + + table = sql """ + select count(*) from local( + "file_path" = "log/be.out", + "backend_id" = "${be_id}", + "format" = "csv") + where c1 like "%start_time%";""" + + assertTrue(table.size() > 0) + assertTrue(Long.valueOf(table[0][0]) > 0) + + table = sql """ + select count(*) from local( + "file_path" = "log/*.out", + "backend_id" = "${be_id}", + "format" = "csv") + where c1 like "%start_time%";""" + + assertTrue(table.size() > 0) + assertTrue(Long.valueOf(table[0][0]) > 0) + + test { + sql """ + select count(*) from local( + "file_path" = "../log/be.out", + "backend_id" = "${be_id}", + "format" = "csv") + where c1 like "%start_time%"; + """ + // check exception message contains + exception "can not contain '..' in path" + } + + test { + sql """ + select count(*) from local( + "file_path" = "./log/xx.out", + "backend_id" = "${be_id}", + "format" = "csv") + where c1 like "%start_time%"; + """ + // check exception message contains + exception "No matches found" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org