This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ba3b6c66f [opt](FileCache) use modification time to determine whether 
the file is changed (#18906)
3ba3b6c66f is described below

commit 3ba3b6c66f9db38d980cafc4290c9217721b4e37
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Thu May 11 07:50:39 2023 +0800

    [opt](FileCache) use modification time to determine whether the file is 
changed (#18906)
    
    Get the last modification time from file status, and use the combination of 
path and modification time to generate cache identifier.
    When a file is changed, the modification time will be changed, so the 
former cache path will be invalid.
---
 be/src/io/cache/block/cached_remote_file_reader.cpp | 13 +++++++++----
 be/src/io/cache/block/cached_remote_file_reader.h   |  5 +++--
 be/src/io/fs/file_reader_options.h                  |  2 ++
 be/src/io/fs/remote_file_system.cpp                 |  6 ++++--
 be/src/vec/exec/format/csv/csv_reader.cpp           |  4 ++++
 be/src/vec/exec/format/json/new_json_reader.cpp     |  2 ++
 be/src/vec/exec/format/orc/vorc_reader.cpp          |  2 ++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp  |  2 ++
 .../doris/catalog/HiveMetaStoreClientHelper.java    |  1 +
 .../org/apache/doris/common/util/BrokerUtil.java    |  1 +
 .../doris/datasource/hive/HiveMetaStoreCache.java   |  3 +++
 .../apache/doris/fs/remote/BrokerFileSystem.java    |  2 +-
 .../java/org/apache/doris/fs/remote/RemoteFile.java | 21 ++++++++++++++++++---
 .../apache/doris/fs/remote/RemoteFileSystem.java    |  4 ++--
 .../org/apache/doris/fs/remote/S3FileSystem.java    |  2 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java   |  2 +-
 .../doris/planner/external/FileGroupInfo.java       |  1 +
 .../doris/planner/external/FileQueryScanNode.java   |  1 +
 .../apache/doris/planner/external/FileScanNode.java | 10 ++++++----
 .../apache/doris/planner/external/FileSplit.java    |  9 ++++++++-
 .../apache/doris/planner/external/HiveScanNode.java |  2 +-
 .../apache/doris/planner/external/TVFScanNode.java  | 13 +------------
 .../planner/external/iceberg/IcebergSplit.java      |  1 +
 .../ExternalFileTableValuedFunction.java            |  1 +
 .../apache/doris/broker/hdfs/FileSystemManager.java |  1 +
 gensrc/thrift/PaloBrokerService.thrift              |  1 +
 gensrc/thrift/PlanNodes.thrift                      |  2 ++
 27 files changed, 80 insertions(+), 34 deletions(-)

diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp 
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index f2c4c80173..81268ec5ad 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -42,17 +42,22 @@ namespace doris {
 namespace io {
 
 CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader,
-                                               const std::string& cache_path)
+                                               const std::string& cache_path,
+                                               const long modification_time)
         : _remote_file_reader(std::move(remote_file_reader)) {
-    _cache_key = IFileCache::hash(cache_path);
+    // Use path and modification time to build cache key
+    std::string unique_path = fmt::format("{}:{}", cache_path, 
modification_time);
+    _cache_key = IFileCache::hash(unique_path);
     _cache = FileCacheFactory::instance().get_by_path(_cache_key);
 }
 
 CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader,
                                                const std::string& 
cache_base_path,
-                                               const std::string& cache_path)
+                                               const std::string& cache_path,
+                                               const long modification_time)
         : _remote_file_reader(std::move(remote_file_reader)) {
-    _cache_key = IFileCache::hash(cache_path);
+    std::string unique_path = fmt::format("{}:{}", cache_path, 
modification_time);
+    _cache_key = IFileCache::hash(unique_path);
     _cache = FileCacheFactory::instance().get_by_path(cache_base_path);
     if (_cache == nullptr) {
         LOG(WARNING) << "Can't get cache from base path: " << cache_base_path
diff --git a/be/src/io/cache/block/cached_remote_file_reader.h 
b/be/src/io/cache/block/cached_remote_file_reader.h
index 68b80e0ce4..5e66f9970a 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.h
+++ b/be/src/io/cache/block/cached_remote_file_reader.h
@@ -39,10 +39,11 @@ struct FileCacheStatistics;
 
 class CachedRemoteFileReader final : public FileReader {
 public:
-    CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const 
std::string& cache_path);
+    CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const 
std::string& cache_path,
+                           const long modification_time);
 
     CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const 
std::string& cache_base_path,
-                           const std::string& cache_path);
+                           const std::string& cache_path, const long 
modification_time);
 
     ~CachedRemoteFileReader() override;
 
diff --git a/be/src/io/fs/file_reader_options.h 
b/be/src/io/fs/file_reader_options.h
index 66629f54de..744db5e1f6 100644
--- a/be/src/io/fs/file_reader_options.h
+++ b/be/src/io/fs/file_reader_options.h
@@ -80,6 +80,8 @@ public:
     int64_t file_size = -1;
     bool has_cache_base_path = false;
     std::string cache_base_path;
+    // Use modification time to determine whether the file is changed
+    int64_t modification_time = 0;
 
     void specify_cache_path(const std::string& base_path) {
         has_cache_base_path = true;
diff --git a/be/src/io/fs/remote_file_system.cpp 
b/be/src/io/fs/remote_file_system.cpp
index 63246a3c87..611f7cf894 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -97,9 +97,11 @@ Status RemoteFileSystem::open_file_impl(const Path& path, 
const FileReaderOption
         if (reader_options.has_cache_base_path) {
             // from query session variable: file_cache_base_path
             *reader = std::make_shared<CachedRemoteFileReader>(
-                    std::move(raw_reader), reader_options.cache_base_path, 
cache_path);
+                    std::move(raw_reader), reader_options.cache_base_path, 
cache_path,
+                    reader_options.modification_time);
         } else {
-            *reader = 
std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path);
+            *reader = 
std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path,
+                                                               
reader_options.modification_time);
         }
         break;
     }
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index cff15c88bb..7cb36e94c7 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -170,6 +170,8 @@ Status CsvReader::init_reader(bool is_load) {
         RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader));
     } else {
         io::FileReaderOptions reader_options = 
FileFactory::get_reader_options(_state);
+        reader_options.modification_time =
+                _range.__isset.modification_time ? _range.modification_time : 
0;
         RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
&_file_system, &_file_reader,
                 io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, 
_io_ctx,
@@ -657,6 +659,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
 
     _file_description.start_offset = start_offset;
     io::FileReaderOptions reader_options = 
FileFactory::get_reader_options(_state);
+    reader_options.modification_time =
+            _range.__isset.modification_time ? _range.modification_time : 0;
     RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_system_properties, _file_description,
                                                     &_file_system, 
&_file_reader, reader_options));
     if (_file_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 9643e48ab6..daa549776b 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -375,6 +375,8 @@ Status NewJsonReader::_open_file_reader() {
         RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader));
     } else {
         io::FileReaderOptions reader_options = 
FileFactory::get_reader_options(_state);
+        reader_options.modification_time =
+                _range.__isset.modification_time ? _range.modification_time : 
0;
         RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
&_file_system, &_file_reader,
                 io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, 
_io_ctx,
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index ea36eec962..0f13b4d191 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -209,6 +209,8 @@ Status OrcReader::_create_file_reader() {
     if (_file_input_stream == nullptr) {
         io::FileReaderSPtr inner_reader;
         io::FileReaderOptions reader_options = 
FileFactory::get_reader_options(_state);
+        reader_options.modification_time =
+                _scan_range.__isset.modification_time ? 
_scan_range.modification_time : 0;
         RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
&_file_system, &inner_reader,
                 io::DelegateReader::AccessMode::RANDOM, reader_options, 
_io_ctx));
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 40d3492229..a9bf8dc6c8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -209,6 +209,8 @@ Status ParquetReader::_open_file() {
         SCOPED_RAW_TIMER(&_statistics.open_file_time);
         ++_statistics.open_file_num;
         io::FileReaderOptions reader_options = 
FileFactory::get_reader_options(_state);
+        reader_options.modification_time =
+                _scan_range.__isset.modification_time ? 
_scan_range.modification_time : 0;
         RETURN_IF_ERROR(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
&_file_system, &_file_reader,
                 io::DelegateReader::AccessMode::RANDOM, reader_options, 
_io_ctx));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 938f659159..184e61b1d1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -233,6 +233,7 @@ public class HiveMetaStoreClientHelper {
                     brokerFileStatus.setIsDir(fileLocation.isDirectory());
                     brokerFileStatus.setIsSplitable(true);
                     brokerFileStatus.setSize(fileLocation.getSize());
+                    
brokerFileStatus.setModificationTime(fileLocation.getModificationTime());
                     // filePath.toUri().getPath() = 
"/path/to/partition/file_name"
                     // eg: 
/home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse
                     //     + /dae.db/customer/state=CA/city=SanJose/000000_0
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 7fa0beda8d..0444abf443 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -102,6 +102,7 @@ public class BrokerUtil {
             if (r.isFile()) {
                 TBrokerFileStatus status = new TBrokerFileStatus(r.getName(), 
!r.isFile(), r.getSize(), r.isFile());
                 status.setBlockSize(r.getBlockSize());
+                status.setModificationTime(r.getModificationTime());
                 fileStatuses.add(status);
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 3bd7daade5..f6382b80da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -324,6 +324,7 @@ public class HiveMetaStoreCache {
                     // Convert the hadoop split to Doris Split.
                     for (int i = 0; i < splits.length; i++) {
                         org.apache.hadoop.mapred.FileSplit fs = 
((org.apache.hadoop.mapred.FileSplit) splits[i]);
+                        // todo: get modification time
                         result.addSplit(new FileSplit(fs.getPath(), 
fs.getStart(), fs.getLength(), -1, null, null));
                     }
                 }
@@ -802,6 +803,7 @@ public class HiveMetaStoreCache {
             status.setPath(file.getPath());
             status.length = file.getSize();
             status.blockSize = file.getBlockSize();
+            status.modificationTime = file.getModificationTime();
             files.add(status);
         }
 
@@ -823,6 +825,7 @@ public class HiveMetaStoreCache {
         Path path;
         long length;
         long blockSize;
+        long modificationTime;
     }
 
     @Data
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
index 14befa5c16..a2eb5560a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java
@@ -581,7 +581,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
 
             List<TBrokerFileStatus> fileStatus = rep.getFiles();
             for (TBrokerFileStatus tFile : fileStatus) {
-                RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, 
tFile.size, 0);
+                RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, 
tFile.size, 0, tFile.getModificationTime());
                 result.add(file);
             }
             LOG.info("finished to list remote path {}. get files: {}", 
remotePath, result);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
index cdfcf451c3..a8d918cffa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java
@@ -33,25 +33,36 @@ public class RemoteFile {
     // A large file will split into multiple blocks. The blocks are 
transparent to the user.
     // Default block size for HDFS 2.x is 128M.
     private final long blockSize;
+    private long modificationTime;
     private Path path;
     BlockLocation[] blockLocations;
 
     public RemoteFile(String name, boolean isFile, long size, long blockSize) {
-        this(name, null, isFile, !isFile, size, blockSize, null);
+        this(name, null, isFile, !isFile, size, blockSize, 0, null);
+    }
+
+    public RemoteFile(String name, boolean isFile, long size, long blockSize, 
long modificationTime) {
+        this(name, null, isFile, !isFile, size, blockSize, modificationTime, 
null);
     }
 
     public RemoteFile(Path path, boolean isDirectory, long size, long 
blockSize, BlockLocation[] blockLocations) {
-        this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 
blockLocations);
+        this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 
0, blockLocations);
+    }
+
+    public RemoteFile(Path path, boolean isDirectory, long size, long 
blockSize, long modificationTime,
+            BlockLocation[] blockLocations) {
+        this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 
modificationTime, blockLocations);
     }
 
     public RemoteFile(String name, Path path, boolean isFile, boolean 
isDirectory,
-                      long size, long blockSize, BlockLocation[] 
blockLocations) {
+            long size, long blockSize, long modificationTime, BlockLocation[] 
blockLocations) {
         Preconditions.checkState(!Strings.isNullOrEmpty(name));
         this.name = name;
         this.isFile = isFile;
         this.isDirectory = isDirectory;
         this.size = size;
         this.blockSize = blockSize;
+        this.modificationTime = modificationTime;
         this.path = path;
         this.blockLocations = blockLocations;
     }
@@ -80,6 +91,10 @@ public class RemoteFile {
         return blockSize;
     }
 
+    public long getModificationTime() {
+        return modificationTime;
+    }
+
     public BlockLocation[] getBlockLocations() {
         return blockLocations;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index 35710f8785..7d87993733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -58,8 +58,8 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         List<RemoteFile> locations = new ArrayList<>();
         while (locatedFiles.hasNext()) {
             LocatedFileStatus fileStatus = locatedFiles.next();
-            RemoteFile location = new RemoteFile(fileStatus.getPath(), 
fileStatus.isDirectory(),
-                    fileStatus.getLen(), fileStatus.getBlockSize(), 
fileStatus.getBlockLocations());
+            RemoteFile location = new RemoteFile(fileStatus.getPath(), 
fileStatus.isDirectory(), fileStatus.getLen(),
+                    fileStatus.getBlockSize(), 
fileStatus.getModificationTime(), fileStatus.getBlockLocations());
             locations.add(location);
         }
         return new RemoteFiles(locations);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index c6cfc0ba02..0d09037c81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -79,7 +79,7 @@ public class S3FileSystem extends ObjFileSystem {
                 RemoteFile remoteFile = new RemoteFile(
                         fileNameOnly ? fileStatus.getPath().getName() : 
fileStatus.getPath().toString(),
                         !fileStatus.isDirectory(), fileStatus.isDirectory() ? 
-1 : fileStatus.getLen(),
-                        fileStatus.getBlockSize());
+                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime());
                 result.add(remoteFile);
             }
         } catch (FileNotFoundException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 26478782a7..d8df27a5c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -431,7 +431,7 @@ public class DFSFileSystem extends RemoteFileSystem {
                 RemoteFile remoteFile = new RemoteFile(
                         fileNameOnly ? fileStatus.getPath().getName() : 
fileStatus.getPath().toString(),
                         !fileStatus.isDirectory(), fileStatus.isDirectory() ? 
-1 : fileStatus.getLen(),
-                        fileStatus.getBlockSize());
+                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime());
                 result.add(remoteFile);
             }
         } catch (FileNotFoundException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index a1e3499382..7808c35ac6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -337,6 +337,7 @@ public class FileGroupInfo {
             rangeDesc.setSize(fileStatus.size);
             rangeDesc.setFileSize(fileStatus.size);
         }
+        rangeDesc.setModificationTime(fileStatus.getModificationTime());
         return rangeDesc;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 86e3dab30f..a36369325e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -306,6 +306,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             // need full path
             rangeDesc.setPath(fileSplit.getPath().toString());
         }
+        rangeDesc.setModificationTime(fileSplit.getModificationTime());
         return rangeDesc;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 3a24ba3319..aa0e923203 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -212,7 +212,7 @@ public class FileScanNode extends ExternalScanNode {
     }
 
     protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] 
blockLocations, long length,
-                                  boolean splittable, List<String> 
partitionValues) throws IOException {
+            long modificationTime, boolean splittable, List<String> 
partitionValues) throws IOException {
         if (blockLocations == null) {
             blockLocations = new BlockLocation[0];
         }
@@ -226,7 +226,7 @@ public class FileScanNode extends ExternalScanNode {
         if (!splittable) {
             LOG.debug("Path {} is not splittable.", path);
             String[] hosts = blockLocations.length == 0 ? null : 
blockLocations[0].getHosts();
-            result.add(new FileSplit(path, 0, length, length, hosts, 
partitionValues));
+            result.add(new FileSplit(path, 0, length, length, 
modificationTime, hosts, partitionValues));
             return result;
         }
         long bytesRemaining;
@@ -234,12 +234,14 @@ public class FileScanNode extends ExternalScanNode {
                 bytesRemaining -= splitSize) {
             int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
             String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
-            result.add(new FileSplit(path, length - bytesRemaining, splitSize, 
length, hosts, partitionValues));
+            result.add(new FileSplit(path, length - bytesRemaining, splitSize,
+                    length, modificationTime, hosts, partitionValues));
         }
         if (bytesRemaining != 0L) {
             int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
             String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
-            result.add(new FileSplit(path, length - bytesRemaining, 
bytesRemaining, length, hosts, partitionValues));
+            result.add(new FileSplit(path, length - bytesRemaining, 
bytesRemaining,
+                    length, modificationTime, hosts, partitionValues));
         }
 
         LOG.debug("Path {} includes {} splits.", path, result.size());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 03f1ab3b60..21b88f9c2b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -34,6 +34,7 @@ public class FileSplit implements Split {
     // -1 means unset.
     // If the file length is not set, the file length will be fetched from the 
file system.
     protected long fileLength;
+    protected long modificationTime;
     protected String[] hosts;
     protected TableFormatType tableFormatType;
     // The values of partitions.
@@ -42,15 +43,21 @@ public class FileSplit implements Split {
     protected List<String> partitionValues;
 
     public FileSplit(Path path, long start, long length, long fileLength,
-                     String[] hosts, List<String> partitionValues) {
+            long modificationTime, String[] hosts, List<String> 
partitionValues) {
         this.path = path;
         this.start = start;
         this.length = length;
         this.fileLength = fileLength;
+        this.modificationTime = modificationTime;
         this.hosts = hosts == null ? new String[0] : hosts;
         this.partitionValues = partitionValues;
     }
 
+    public FileSplit(Path path, long start, long length, long fileLength,
+            String[] hosts, List<String> partitionValues) {
+        this(path, start, length, fileLength, 0, hosts, partitionValues);
+    }
+
     public String[] getHosts() {
         return hosts;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 96c1872010..87d0e55796 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -170,7 +170,7 @@ public class HiveScanNode extends FileQueryScanNode {
                 boolean isSplittable = fileCacheValue.isSplittable();
                 for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
                     allFiles.addAll(splitFile(status.getPath(), 
status.getBlockSize(),
-                            status.getBlockLocations(), status.getLength(),
+                            status.getBlockLocations(), status.getLength(), 
status.getModificationTime(),
                             isSplittable, 
fileCacheValue.getPartitionValues()));
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index a8ede0d843..67008f5d7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -111,7 +111,7 @@ public class TVFScanNode extends FileQueryScanNode {
             Path path = new Path(fileStatus.getPath());
             try {
                 splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, 
fileStatus.getSize(),
-                        fileStatus.isSplitable, null));
+                        fileStatus.getModificationTime(), 
fileStatus.isSplitable, null));
             } catch (IOException e) {
                 LOG.warn("get file split failed for TVF: {}", path, e);
                 throw new UserException(e);
@@ -119,15 +119,4 @@ public class TVFScanNode extends FileQueryScanNode {
         }
         return splits;
     }
-
-    private void addFileSplits(Path path, long fileSize, long splitSize, 
List<Split> splits) {
-        long bytesRemaining;
-        for (bytesRemaining = fileSize; (double) bytesRemaining / (double) 
splitSize > 1.1D;
-                bytesRemaining -= splitSize) {
-            splits.add(new FileSplit(path, fileSize - bytesRemaining, 
splitSize, fileSize, new String[0], null));
-        }
-        if (bytesRemaining != 0L) {
-            splits.add(new FileSplit(path, fileSize - bytesRemaining, 
bytesRemaining, fileSize, new String[0], null));
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 896b4968b4..9064017088 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 @Data
 public class IcebergSplit extends FileSplit {
+    // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts) {
         super(file, start, length, fileLength, hosts, null);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 47adc717a5..74ee6a855d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -457,6 +457,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         fileRangeDesc.setStartOffset(0);
         fileRangeDesc.setSize(firstFile.getSize());
         fileRangeDesc.setFileSize(firstFile.getSize());
+        fileRangeDesc.setModificationTime(firstFile.getModificationTime());
         // set TFileScanRange
         TFileScanRange fileScanRange = new TFileScanRange();
         fileScanRange.addToRanges(fileRangeDesc);
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 17f4d13216..1b0b94b923 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -1036,6 +1036,7 @@ public class FileSystemManager {
                     brokerFileStatus.setSize(fileStatus.getLen());
                     brokerFileStatus.setIsSplitable(true);
                 }
+                
brokerFileStatus.setModificationTime(fileStatus.getModificationTime());
                 if (fileNameOnly) {
                     // return like this: file.txt
                     brokerFileStatus.setPath(fileStatus.getPath().getName());
diff --git a/gensrc/thrift/PaloBrokerService.thrift 
b/gensrc/thrift/PaloBrokerService.thrift
index 1d4d2e876a..308c606544 100644
--- a/gensrc/thrift/PaloBrokerService.thrift
+++ b/gensrc/thrift/PaloBrokerService.thrift
@@ -56,6 +56,7 @@ struct TBrokerFileStatus {
                                   //and the entire file must be imported as a 
complete map task.
                                   //the return value of the compressed file is 
false
     5: optional i64 blockSize; //Block size in FS. e.g. HDFS and S3
+    6: optional i64 modificationTime = 0; // Last modification time
 }
 
 struct TBrokerFD {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 3b332744a9..b2ac960a10 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -348,6 +348,8 @@ struct TFileRangeDesc {
     7: optional list<string> columns_from_path_keys;
     // For data lake table format
     8: optional TTableFormatFileDesc table_format_params
+    // Use modification time to determine whether the file is changed
+    9: optional i64 modification_time
 }
 
 // TFileScanRange represents a set of descriptions of a file and the rules for 
reading and converting it.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to