This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 1adb4cfdd9 [Enhancement](tvf) Table value function support reading 
local file (#22915)
1adb4cfdd9 is described below

commit 1adb4cfdd954648b9aae96f4c91a271422fb711c
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sun Aug 13 19:39:16 2023 +0800

    [Enhancement](tvf) Table value function support reading local file (#22915)
    
    cherry pick #17404
---
 be/src/io/fs/err_utils.cpp                         |  20 +++
 be/src/io/fs/err_utils.h                           |   1 +
 be/src/io/fs/local_file_system.cpp                 |  50 +++++++
 be/src/io/fs/local_file_system.h                   |  11 ++
 be/src/olap/tablet_schema_cache.cpp                |   1 -
 be/src/service/internal_service.cpp                |  22 +++
 be/src/service/internal_service.h                  |   3 +
 be/test/io/fs/local_file_system_test.cpp           |  30 +++++
 docs/en/docs/admin-manual/config/be-config.md      |   5 +
 .../sql-functions/table-functions/local.md         | 150 +++++++++++++++++++++
 docs/sidebars.json                                 |   1 +
 docs/zh-CN/docs/admin-manual/config/be-config.md   |   5 +
 .../sql-functions/table-functions/local.md         | 147 ++++++++++++++++++++
 .../doris/analysis/TableValuedFunctionRef.java     |   8 +-
 .../doris/planner/external/FileQueryScanNode.java  |   4 +-
 .../apache/doris/planner/external/TVFScanNode.java |  19 +++
 .../org/apache/doris/rpc/BackendServiceClient.java |   4 +
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  12 ++
 .../ExternalFileTableValuedFunction.java           |  21 +--
 .../tablefunction/LocalTableValuedFunction.java    | 145 ++++++++++++++++++++
 .../doris/tablefunction/TableValuedFunctionIf.java |   2 +
 gensrc/proto/internal_service.proto                |  14 ++
 .../external_table_p0/tvf/test_local_tvf.groovy    |  67 +++++++++
 23 files changed, 729 insertions(+), 13 deletions(-)

diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index 5530b25a0d..648a358850 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -53,5 +53,25 @@ std::string hdfs_error() {
     return ss.str();
 }
 
+std::string glob_err_to_str(int code) {
+    std::string msg;
+    // https://sites.uclouvain.be/SystInfo/usr/include/glob.h.html
+    switch (code) {
+    case 1:
+        msg = "Ran out of memory";
+        break;
+    case 2:
+        msg = "read error";
+        break;
+    case 3:
+        msg = "No matches found";
+        break;
+    default:
+        msg = "unknown";
+        break;
+    }
+    return fmt::format("({}), {}", code, msg);
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/err_utils.h b/be/src/io/fs/err_utils.h
index 31ca702c32..971596fab1 100644
--- a/be/src/io/fs/err_utils.h
+++ b/be/src/io/fs/err_utils.h
@@ -26,6 +26,7 @@ namespace io {
 std::string errno_to_str();
 std::string errcode_to_str(const std::error_code& ec);
 std::string hdfs_error();
+std::string glob_err_to_str(int code);
 
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/local_file_system.cpp 
b/be/src/io/fs/local_file_system.cpp
index 01cd8829dd..48c1981202 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -19,6 +19,7 @@
 
 #include <fcntl.h>
 #include <fmt/format.h>
+#include <glob.h>
 #include <glog/logging.h>
 #include <openssl/md5.h>
 #include <sys/mman.h>
@@ -428,5 +429,54 @@ const std::shared_ptr<LocalFileSystem>& 
global_local_filesystem() {
     return local_fs;
 }
 
+Status LocalFileSystem::canonicalize_local_file(const std::string& dir,
+                                                const std::string& file_path,
+                                                std::string* full_path) {
+    const std::string absolute_path = dir + "/" + file_path;
+    std::string canonical_path;
+    RETURN_IF_ERROR(canonicalize(absolute_path, &canonical_path));
+    if (!contain_path(dir, canonical_path)) {
+        return Status::InvalidArgument("file path is not allowed: {}", 
canonical_path);
+    }
+
+    *full_path = canonical_path;
+    return Status::OK();
+}
+
+Status LocalFileSystem::safe_glob(const std::string& path, 
std::vector<FileInfo>* res) {
+    if (path.find("..") != std::string::npos) {
+        return Status::InvalidArgument("can not contain '..' in path");
+    }
+    std::string full_path = config::user_files_secure_path + "/" + path;
+    std::vector<std::string> files;
+    RETURN_IF_ERROR(_glob(full_path, &files));
+    for (auto& file : files) {
+        FileInfo fi;
+        fi.is_file = true;
+        RETURN_IF_ERROR(canonicalize_local_file("", file, &(fi.file_name)));
+        RETURN_IF_ERROR(file_size_impl(fi.file_name, &(fi.file_size)));
+        res->push_back(std::move(fi));
+    }
+    return Status::OK();
+}
+
+Status LocalFileSystem::_glob(const std::string& pattern, 
std::vector<std::string>* res) {
+    glob_t glob_result;
+    memset(&glob_result, 0, sizeof(glob_result));
+
+    int rc = glob(pattern.c_str(), GLOB_TILDE, NULL, &glob_result);
+    if (rc != 0) {
+        globfree(&glob_result);
+        return Status::IOError("failed to glob {}: {}", pattern, 
glob_err_to_str(rc));
+    }
+
+    for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
+        res->push_back(std::string(glob_result.gl_pathv[i]));
+    }
+
+    globfree(&glob_result);
+    return Status::OK();
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index d9c0ec96c8..1f8d35c096 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -72,6 +72,15 @@ public:
     // read local file and save content to "content"
     Status read_file_to_string(const Path& file, std::string* content);
 
+    Status canonicalize_local_file(const std::string& dir, const std::string& 
file_path,
+                                   std::string* full_path);
+
+    // glob list the files match the path pattern.
+    // the result will be saved in "res", in absolute path with file size.
+    // "safe" means the path will be concat with the path prefix 
config::user_files_secure_path,
+    // so that it can not list any files outside the 
config::user_files_secure_path
+    Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
+
 protected:
     Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
     Status open_file_impl(const FileDescription& file_desc, const Path& 
abs_path,
@@ -97,6 +106,8 @@ protected:
     Status delete_directory_or_file_impl(const Path& path);
 
 private:
+    // a wrapper for glob(), return file list in "res"
+    Status _glob(const std::string& pattern, std::vector<std::string>* res);
     LocalFileSystem(Path&& root_path, std::string&& id = "");
 };
 
diff --git a/be/src/olap/tablet_schema_cache.cpp 
b/be/src/olap/tablet_schema_cache.cpp
index ee14358495..e14c3f7ecc 100644
--- a/be/src/olap/tablet_schema_cache.cpp
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -74,7 +74,6 @@ void TabletSchemaCache::_recycle() {
         }
     }
     _is_stopped = true;
-    LOG(INFO) << "xxx yyy stopped ";
 }
 
 } // namespace doris
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 3a5aa06125..1bbecfca4e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -55,6 +55,7 @@
 #include "common/status.h"
 #include "gutil/integral_types.h"
 #include "http/http_client.h"
+#include "io/fs/local_file_system.h"
 #include "io/fs/stream_load_pipe.h"
 #include "io/io_common.h"
 #include "olap/data_dir.h"
@@ -1608,4 +1609,25 @@ void 
PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcContr
     
ExecEnv::GetInstance()->storage_engine()->get_tablet_rowset_versions(request, 
response);
 }
 
+void PInternalServiceImpl::glob(google::protobuf::RpcController* controller,
+                                const PGlobRequest* request, PGlobResponse* 
response,
+                                google::protobuf::Closure* done) {
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        std::vector<io::FileInfo> files;
+        Status st = 
io::global_local_filesystem()->safe_glob(request->pattern(), &files);
+        if (st.ok()) {
+            for (auto& file : files) {
+                PGlobResponse_PFileInfo* pfile = response->add_files();
+                pfile->set_file(file.file_name);
+                pfile->set_size(file.file_size);
+            }
+        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _heavy_work_pool);
+    }
+}
+
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index aa30959ca3..47762cf7e5 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -181,6 +181,9 @@ public:
                                     PGetTabletVersionsResponse* response,
                                     google::protobuf::Closure* done) override;
 
+    void glob(google::protobuf::RpcController* controller, const PGlobRequest* 
request,
+              PGlobResponse* response, google::protobuf::Closure* done) 
override;
+
 private:
     void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* 
controller,
                                         const PExecPlanFragmentRequest* 
request,
diff --git a/be/test/io/fs/local_file_system_test.cpp 
b/be/test/io/fs/local_file_system_test.cpp
index ea452782b2..953d7669b8 100644
--- a/be/test/io/fs/local_file_system_test.cpp
+++ b/be/test/io/fs/local_file_system_test.cpp
@@ -610,4 +610,34 @@ TEST_F(LocalFileSystemTest, TestRandomWrite) {
         EXPECT_TRUE(file_reader->close().ok());
     }
 }
+
+TEST_F(LocalFileSystemTest, TestGlob) {
+    std::string path = "./be/ut_build_ASAN/test/file_path/";
+    EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
+    EXPECT_TRUE(io::global_local_filesystem()
+                        
->create_directory("./be/ut_build_ASAN/test/file_path/1")
+                        .ok());
+    EXPECT_TRUE(io::global_local_filesystem()
+                        
->create_directory("./be/ut_build_ASAN/test/file_path/2")
+                        .ok());
+    EXPECT_TRUE(io::global_local_filesystem()
+                        
->create_directory("./be/ut_build_ASAN/test/file_path/3")
+                        .ok());
+
+    save_string_file("./be/ut_build_ASAN/test/file_path/1/f1.txt", "just 
test");
+    save_string_file("./be/ut_build_ASAN/test/file_path/1/f2.txt", "just 
test");
+    save_string_file("./be/ut_build_ASAN/test/file_path/f3.txt", "just test");
+
+    std::vector<io::FileInfo> files;
+    EXPECT_FALSE(io::global_local_filesystem()->safe_glob("./../*.txt", 
&files).ok());
+    EXPECT_FALSE(io::global_local_filesystem()->safe_glob("/*.txt", 
&files).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/1/*.txt", 
&files).ok());
+    EXPECT_EQ(2, files.size());
+    files.clear();
+    
EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/*/*.txt", 
&files).ok());
+    EXPECT_EQ(2, files.size());
+
+    EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
+}
+
 } // namespace doris
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index 601577f252..88a044fdfd 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -1465,3 +1465,8 @@ Indicates how many tablets failed to load in the data 
directory. At the same tim
 
 * Description: If true, when the process does not exceed the soft mem limit, 
the query memory will not be limited; when the process memory exceeds the soft 
mem limit, the query with the largest ratio between the currently used memory 
and the exec_mem_limit will be canceled. If false, cancel query when the memory 
used exceeds exec_mem_limit.
 * Default value: true
+
+#### `user_files_secure_path`
+
+* Description: The storage directory for files queried by `local` table valued 
functions.
+* Default value: `${DORIS_HOME}`
\ No newline at end of file
diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/local.md 
b/docs/en/docs/sql-manual/sql-functions/table-functions/local.md
new file mode 100644
index 0000000000..ab3be7d4ae
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-functions/table-functions/local.md
@@ -0,0 +1,150 @@
+---
+{
+    "title": "local",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Local
+
+### Name
+
+<version since="dev">
+
+local
+
+</version>
+
+### Description
+
+Local table-valued-function(tvf), allows users to read and access local file 
contents on be node, just like accessing relational table. Currently supports 
`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` file format.
+
+It needs `ADMIN` privilege to use.
+
+#### syntax
+
+```sql
+local(
+  "file_path" = "path/to/file.txt", 
+  "backend_id" = "be_id",
+  "format" = "csv",
+  "keyn" = "valuen" 
+  ...
+  );
+```
+
+**parameter description**
+
+Related parameters for accessing local file on be node:
+
+- `file_path`:
+
+    (required) The path of the file to be read, which is a relative path to 
the `user_files_secure_path` directory, where `user_files_secure_path` 
parameter [can be configured on be](../../../admin-manual/config/be-config.md).
+
+    Can not contains `..` in path. Support using glob syntax to match multi 
files, such as `log/*.log`
+
+- `backend_id`:
+
+    (required) The backend id where the file resides. The `backend_id` can be 
obtained by `show backends` command.
+
+File format parameters:
+
+- `format`: (required) Currently support 
`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
+- `column_separator`: (optional) default `,`.
+- `line_delimiter`: (optional) default `\n`.
+- `compress_type`: (optional) Currently support 
`UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it 
will automatically infer the type based on the suffix of `uri`.
+
+    The following 6 parameters are used for loading in json format. For 
specific usage methods, please refer to: [Json 
Load](../../../data-operate/import/import-way/load-json-format.md)
+
+- `read_json_by_line`: (optional) default `"true"`
+- `strip_outer_array`: (optional) default `"false"`
+- `json_root`: (optional) default `""`
+- `json_paths`: (optional) default `""`
+- `num_as_string`: (optional) default `false`
+- `fuzzy_parse`: (optional) default `false`
+
+    <version since="dev">The following 2 parameters are used for loading in 
csv format</version>
+
+- `trim_double_quotes`: Boolean type (optional), the default value is `false`. 
True means that the outermost double quotes of each field in the csv file are 
trimmed.
+- `skip_lines`: Integer type (optional), the default value is 0. It will skip 
some lines in the head of csv file. It will be disabled when the format is 
`csv_with_names` or `csv_with_names_and_types`.
+
+### Examples
+
+Analyze the log file on specified BE:
+
+```sql
+mysql> select * from local(
+        "file_path" = "log/be.out",
+        "backend_id" = "10006",
+        "format" = "csv")
+       where c1 like "%start_time%" limit 10;
++--------------------------------------------------------+
+| c1                                                     |
++--------------------------------------------------------+
+| start time: 2023年 08月 07日 星期一 23:20:32 CST       |
+| start time: 2023年 08月 07日 星期一 23:32:10 CST       |
+| start time: 2023年 08月 08日 星期二 00:20:50 CST       |
+| start time: 2023年 08月 08日 星期二 00:29:15 CST       |
++--------------------------------------------------------+
+```
+
+Read and access csv format files located at path `${DORIS_HOME}/student.csv`:
+
+```sql
+mysql> select * from local(
+      "file_path" = "student.csv", 
+      "backend_id" = "10003", 
+      "format" = "csv");
++------+---------+--------+
+| c1   | c2      | c3     |
++------+---------+--------+
+| 1    | alice   | 18     |
+| 2    | bob     | 20     |
+| 3    | jack    | 24     |
+| 4    | jackson | 19     |
+| 5    | liming  | d18    |
++------+---------+--------+
+```
+
+Can be used with `desc function` :
+
+```sql
+mysql> desc function local(
+      "file_path" = "student.csv", 
+      "backend_id" = "10003", 
+      "format" = "csv");
++-------+------+------+-------+---------+-------+
+| Field | Type | Null | Key   | Default | Extra |
++-------+------+------+-------+---------+-------+
+| c1    | TEXT | Yes  | false | NULL    | NONE  |
+| c2    | TEXT | Yes  | false | NULL    | NONE  |
+| c3    | TEXT | Yes  | false | NULL    | NONE  |
++-------+------+------+-------+---------+-------+
+```
+
+### Keywords
+
+    local, table-valued-function, tvf
+
+### Best Practice
+
+  For more detailed usage of local tvf, please refer to [S3](./s3.md) tvf, The 
only difference between them is the way of accessing the storage system.
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 52757c8f8d..a03377253e 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -700,6 +700,7 @@
                                 "sql-manual/sql-functions/table-functions/s3",
                                 
"sql-manual/sql-functions/table-functions/hdfs",
                                 
"sql-manual/sql-functions/table-functions/iceberg_meta",
+                                
"sql-manual/sql-functions/table-functions/local",
                                 
"sql-manual/sql-functions/table-functions/backends",
                                 
"sql-manual/sql-functions/table-functions/frontends",
                                 
"sql-manual/sql-functions/table-functions/workload-group",
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 291c711470..5e48d603f6 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1494,3 +1494,8 @@ load tablets from header failed, failed tablets size: 
xxx, path=xxx
 
 * 描述: 如果为true,则当内存未超过 exec_mem_limit 时,查询内存将不受限制;当进程内存超过 exec_mem_limit 且大于 
2GB 时,查询会被取消。如果为false,则在使用的内存超过 exec_mem_limit 时取消查询。
 * 默认值: true
+
+#### `user_files_secure_path`
+
+* 描述: `local` 表函数查询的文件的存储目录。
+* 默认值: `${DORIS_HOME}`
\ No newline at end of file
diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md 
b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md
new file mode 100644
index 0000000000..e4ed638017
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md
@@ -0,0 +1,147 @@
+---
+{
+    "title": "local",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## local
+
+### Name
+
+<version since="dev">
+
+local
+
+</version>
+
+### Description
+
+Local表函数(table-valued-function,tvf),可以让用户像访问关系表格式数据一样,读取并访问 be 
上的文件内容。目前支持`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`文件格式。
+
+该函数需要 ADMIN 权限。
+
+#### syntax
+```sql
+local(
+  "file_path" = "path/to/file.txt", 
+  "backend_id" = "be_id",
+  "format" = "csv",
+  "keyn" = "valuen" 
+  ...
+  );
+```
+
+**参数说明**
+
+访问local文件的相关参数:
+- `file_path`
+
+    (必填)待读取文件的路径,该路径是一个相对于 `user_files_secure_path` 目录的相对路径, 其中 
`user_files_secure_path` 参数是 
[be的一个配置项](../../../admin-manual/config/be-config.md) 。
+
+    路径中不能包含 `..`,可以使用 glob 语法进行模糊匹配,如:`logs/*.log`
+
+- `backend_id`:
+
+    (必填)文件所在的 be id。 `backend_id` 可以通过 `show backends` 命令得到。
+
+文件格式相关参数
+- `format`:(必填) 目前支持 
`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
+- `column_separator`:(选填) 列分割符, 默认为`,`。 
+- `line_delimiter`:(选填) 行分割符,默认为`\n`。
+- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 
`UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。
+
+    下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json 
Load](../../../data-operate/import/import-way/load-json-format.md)
+
+- `read_json_by_line`: (选填) 默认为 `"true"`
+- `strip_outer_array`: (选填) 默认为 `"false"`
+- `json_root`: (选填) 默认为空
+- `json_paths`: (选填) 默认为空
+- `num_as_string`: (选填) 默认为 `false`
+- `fuzzy_parse`: (选填) 默认为 `false`
+
+    <version since="dev">下面2个参数是用于csv格式的导入</version>
+
+- `trim_double_quotes`: 布尔类型,选填,默认值为 `false`,为 `true` 时表示裁剪掉 csv 文件每个字段最外层的双引号
+- `skip_lines`: 整数类型,选填,默认值为0,含义为跳过csv文件的前几行。当设置format设置为 `csv_with_names` 或 
`csv_with_names_and_types` 时,该参数会失效 
+
+### Examples
+
+分析指定 BE 上的日志文件:
+
+```sql
+mysql> select * from local(
+        "file_path" = "log/be.out",
+        "backend_id" = "10006",
+        "format" = "csv")
+       where c1 like "%start_time%" limit 10;
++--------------------------------------------------------+
+| c1                                                     |
++--------------------------------------------------------+
+| start time: 2023年 08月 07日 星期一 23:20:32 CST       |
+| start time: 2023年 08月 07日 星期一 23:32:10 CST       |
+| start time: 2023年 08月 08日 星期二 00:20:50 CST       |
+| start time: 2023年 08月 08日 星期二 00:29:15 CST       |
++--------------------------------------------------------+
+```
+
+读取和访问位于路径`${DORIS_HOME}/student.csv`的 csv格式文件:
+
+```sql
+mysql> select * from local(
+      "file_path" = "student.csv", 
+      "backend_id" = "10003", 
+      "format" = "csv");
++------+---------+--------+
+| c1   | c2      | c3     |
++------+---------+--------+
+| 1    | alice   | 18     |
+| 2    | bob     | 20     |
+| 3    | jack    | 24     |
+| 4    | jackson | 19     |
+| 5    | liming  | d18    |
++------+---------+--------+
+```
+
+可以配合`desc function`使用
+
+```sql
+mysql> desc function local(
+      "file_path" = "student.csv", 
+      "backend_id" = "10003", 
+      "format" = "csv");
++-------+------+------+-------+---------+-------+
+| Field | Type | Null | Key   | Default | Extra |
++-------+------+------+-------+---------+-------+
+| c1    | TEXT | Yes  | false | NULL    | NONE  |
+| c2    | TEXT | Yes  | false | NULL    | NONE  |
+| c3    | TEXT | Yes  | false | NULL    | NONE  |
++-------+------+------+-------+---------+-------+
+```
+
+### Keywords
+
+    local, table-valued-function, tvf
+
+### Best Practice
+
+  关于local tvf的更详细使用方法可以参照 [S3](./s3.md) tvf, 唯一不同的是访问存储系统的方式不一样。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
index ba1b07eb4c..166b3297ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
@@ -27,6 +27,7 @@ import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.tablefunction.BackendsTableValuedFunction;
+import org.apache.doris.tablefunction.LocalTableValuedFunction;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 
 import java.util.Map;
@@ -103,11 +104,12 @@ public class TableValuedFunctionRef extends TableRef {
             return;
         }
 
-        // check privilige for backends tvf
-        if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)) {
+        // check privilige for backends/local tvf
+        if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)
+                || funcName.equalsIgnoreCase(LocalTableValuedFunction.NAME)) {
             if 
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)
                     && 
!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
-                                                                            
PrivPredicate.OPERATOR)) {
+                    PrivPredicate.OPERATOR)) {
                 
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"ADMIN/OPERATOR");
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 586baa58d0..db8f835abb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -364,7 +364,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                     params.addToBrokerAddresses(new 
TNetworkAddress(broker.host, broker.port));
                 }
             }
-        } else if (locationType == TFileType.FILE_S3 && 
!params.isSetProperties()) {
+        } else if ((locationType == TFileType.FILE_S3 || locationType == 
TFileType.FILE_LOCAL)
+                && !params.isSetProperties()) {
             params.setProperties(locationProperties);
         }
 
@@ -405,6 +406,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
         } else if (locationType == TFileType.FILE_S3
                 || locationType == TFileType.FILE_BROKER
+                || locationType == TFileType.FILE_LOCAL
                 || locationType == TFileType.FILE_NET) {
             // need full path
             rangeDesc.setPath(fileSplit.getPath().toString());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index 476e16b098..d895856401 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FunctionGenTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
@@ -27,7 +28,9 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Backend;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.LocalTableValuedFunction;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileCompressType;
@@ -40,6 +43,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -62,6 +66,21 @@ public class TVFScanNode extends FileQueryScanNode {
     }
 
     @Override
+    protected void initBackendPolicy() throws UserException {
+        List<String> preferLocations = new ArrayList<>();
+        if (tableValuedFunction instanceof LocalTableValuedFunction) {
+            // For local tvf, the backend was specified by backendId
+            Long backendId = ((LocalTableValuedFunction) 
tableValuedFunction).getBackendId();
+            Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+            if (backend == null) {
+                throw new UserException("Backend " + backendId + " does not 
exist");
+            }
+            preferLocations.add(backend.getHost());
+        }
+        backendPolicy.init(preferLocations);
+        numNodes = backendPolicy.numBackends();
+    }
+
     protected String getFsName(FileSplit split) {
         return tableValuedFunction.getFsName();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 484f6e77ed..8df33927fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -142,6 +142,10 @@ public class BackendServiceClient {
         return stub.getColumnIdsByTabletIds(request);
     }
 
+    public Future<InternalService.PGlobResponse> 
glob(InternalService.PGlobRequest request) {
+        return stub.glob(request);
+    }
+
     public void shutdown() {
         if (!channel.isShutdown()) {
             channel.shutdown();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 39dfd7915f..5fc3ef2815 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -387,4 +387,16 @@ public class BackendServiceProxy {
         }
     }
 
+    public Future<InternalService.PGlobResponse> glob(TNetworkAddress address,
+            InternalService.PGlobRequest request) throws RpcException {
+        try {
+            final BackendServiceClient client = getProxy(address);
+            return client.glob(request);
+        } catch (Throwable e) {
+            LOG.warn("failed to glob dir from BE {}:{}, path: {}, error: ",
+                    address.getHostname(), address.getPort(), 
request.getPattern());
+            throw new RpcException(address.hostname, e.getMessage());
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 7e1a4b6698..035d54c0d1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.MapType;
 import org.apache.doris.catalog.PrimitiveType;
@@ -347,18 +348,13 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
             return columns;
         }
         // get one BE address
-        TNetworkAddress address = null;
         columns = Lists.newArrayList();
-        for (Backend be : 
org.apache.doris.catalog.Env.getCurrentSystemInfo().getIdToBackend().values()) {
-            if (be.isAlive()) {
-                address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
-                break;
-            }
-        }
-        if (address == null) {
+        Backend be = getBackend();
+        if (be == null) {
             throw new AnalysisException("No Alive backends");
         }
 
+        TNetworkAddress address = new TNetworkAddress(be.getHost(), 
be.getBrpcPort());
         try {
             PFetchTableSchemaRequest request = getFetchTableStructureRequest();
             Future<InternalService.PFetchTableSchemaResult> future = 
BackendServiceProxy.getInstance()
@@ -390,6 +386,15 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         return columns;
     }
 
+    protected Backend getBackend() {
+        for (Backend be : 
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+            if (be.isAlive()) {
+                return be;
+            }
+        }
+        return null;
+    }
+
     /**
      * Convert PTypeDesc into doris column type
      * @param typeNodes list PTypeNodes in PTypeDesc
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
new file mode 100644
index 0000000000..f6693317ba
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGlobResponse;
+import org.apache.doris.proto.InternalService.PGlobResponse.PFileInfo;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The implement of table valued function
+ * local("file_path" = "path/to/file.txt", "backend_id" = "be_id").
+ */
+public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
+    private static final Logger LOG = 
LogManager.getLogger(LocalTableValuedFunction.class);
+
+    public static final String NAME = "local";
+    public static final String FILE_PATH = "file_path";
+    public static final String BACKEND_ID = "backend_id";
+
+    private static final ImmutableSet<String> LOCATION_PROPERTIES = new 
ImmutableSet.Builder<String>()
+            .add(FILE_PATH)
+            .add(BACKEND_ID)
+            .build();
+
+    private String filePath;
+    private long backendId;
+
+    public LocalTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
+        Map<String, String> fileFormatParams = new CaseInsensitiveMap();
+        locationProperties = Maps.newHashMap();
+        for (String key : params.keySet()) {
+            if (FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) {
+                fileFormatParams.put(key, params.get(key));
+            } else if (LOCATION_PROPERTIES.contains(key.toLowerCase())) {
+                locationProperties.put(key.toLowerCase(), params.get(key));
+            } else {
+                throw new AnalysisException(key + " is invalid property");
+            }
+        }
+
+        if (!locationProperties.containsKey(FILE_PATH)) {
+            throw new AnalysisException(String.format("Configuration '%s' is 
required.", FILE_PATH));
+        }
+        if (!locationProperties.containsKey(BACKEND_ID)) {
+            throw new AnalysisException(String.format("Configuration '%s' is 
required.", BACKEND_ID));
+        }
+
+        filePath = locationProperties.get(FILE_PATH);
+        backendId = Long.parseLong(locationProperties.get(BACKEND_ID));
+        parseProperties(fileFormatParams);
+
+        getFileListFromBackend();
+    }
+
+    private void getFileListFromBackend() throws AnalysisException {
+        Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+        if (be == null) {
+            throw new AnalysisException("backend not found with backend_id = " 
+ backendId);
+        }
+
+        BackendServiceProxy proxy = BackendServiceProxy.getInstance();
+        TNetworkAddress address = be.getBrpcAdress();
+        InternalService.PGlobRequest.Builder requestBuilder = 
InternalService.PGlobRequest.newBuilder();
+        requestBuilder.setPattern(filePath);
+        try {
+            Future<PGlobResponse> response = proxy.glob(address, 
requestBuilder.build());
+            PGlobResponse globResponse = response.get(5, TimeUnit.SECONDS);
+            if (globResponse.getStatus().getStatusCode() != 0) {
+                throw new AnalysisException(
+                        "error code: " + 
globResponse.getStatus().getStatusCode()
+                                + ", error msg: " + 
globResponse.getStatus().getErrorMsgsList());
+            }
+            for (PFileInfo file : globResponse.getFilesList()) {
+                fileStatuses.add(new TBrokerFileStatus(file.getFile().trim(), 
false, file.getSize(), true));
+                LOG.info("get file from backend success. file: {}, size: {}", 
file.getFile(), file.getSize());
+            }
+        } catch (Exception e) {
+            throw new AnalysisException("get file list from backend failed. " 
+ e.getMessage());
+        }
+    }
+
+    @Override
+    public TFileType getTFileType() {
+        return TFileType.FILE_LOCAL;
+    }
+
+    @Override
+    public String getFilePath() {
+        return filePath;
+    }
+
+    @Override
+    public BrokerDesc getBrokerDesc() {
+        return new BrokerDesc("LocalTvfBroker", StorageType.LOCAL, 
locationProperties);
+    }
+
+    @Override
+    public String getTableName() {
+        return "LocalTableValuedFunction";
+    }
+
+    public Long getBackendId() {
+        return backendId;
+    }
+
+    @Override
+    protected Backend getBackend() {
+        return Env.getCurrentSystemInfo().getBackend(backendId);
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index ea135b8b1b..2c67178494 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -49,6 +49,8 @@ public abstract class TableValuedFunctionIf {
                 return new S3TableValuedFunction(params);
             case HdfsTableValuedFunction.NAME:
                 return new HdfsTableValuedFunction(params);
+            case LocalTableValuedFunction.NAME:
+                return new LocalTableValuedFunction(params);
             case IcebergTableValuedFunction.NAME:
                 return new IcebergTableValuedFunction(params);
             case BackendsTableValuedFunction.NAME:
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index de1f7876dc..8b698b9948 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -670,6 +670,19 @@ message PGetTabletVersionsResponse {
     repeated PVersion versions = 2;
 };
 
+message PGlobRequest {
+    optional string pattern = 1;
+}
+
+message PGlobResponse {
+    message PFileInfo {
+        optional string file = 1;
+        optional int64 size = 2;
+    };
+    required PStatus status = 1;
+    repeated PFileInfo files = 2;
+}
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
     rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -707,5 +720,6 @@ service PBackendService {
     rpc tablet_fetch_data(PTabletKeyLookupRequest) returns 
(PTabletKeyLookupResponse);
     rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns 
(PFetchColIdsResponse);
     rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns 
(PGetTabletVersionsResponse);
+    rpc glob(PGlobRequest) returns (PGlobResponse);
 };
 
diff --git a/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy 
b/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy
new file mode 100644
index 0000000000..48782c4ad6
--- /dev/null
+++ b/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This suit test the `backends` tvf
+suite("test_local_tvf") {
+    List<List<Object>> table =  sql """ select * from backends(); """
+    assertTrue(table.size() > 0)
+    def be_id = table[0][0]
+
+    table = sql """
+        select count(*) from local(
+            "file_path" = "log/be.out",
+            "backend_id" = "${be_id}",
+            "format" = "csv")
+        where c1 like "%start_time%";"""
+
+    assertTrue(table.size() > 0)
+    assertTrue(Long.valueOf(table[0][0]) > 0)
+
+    table = sql """
+        select count(*) from local(
+            "file_path" = "log/*.out",
+            "backend_id" = "${be_id}",
+            "format" = "csv")
+        where c1 like "%start_time%";"""
+
+    assertTrue(table.size() > 0)
+    assertTrue(Long.valueOf(table[0][0]) > 0)
+
+    test {
+        sql """
+        select count(*) from local(
+            "file_path" = "../log/be.out",
+            "backend_id" = "${be_id}",
+            "format" = "csv")
+        where c1 like "%start_time%";
+        """
+        // check exception message contains
+        exception "can not contain '..' in path"
+    }
+
+    test {
+        sql """
+        select count(*) from local(
+            "file_path" = "./log/xx.out",
+            "backend_id" = "${be_id}",
+            "format" = "csv")
+        where c1 like "%start_time%";
+        """
+        // check exception message contains
+        exception "No matches found"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to