This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3ba3b6c66f [opt](FileCache) use modification time to determine whether the file is changed (#18906) 3ba3b6c66f is described below commit 3ba3b6c66f9db38d980cafc4290c9217721b4e37 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Thu May 11 07:50:39 2023 +0800 [opt](FileCache) use modification time to determine whether the file is changed (#18906) Get the last modification time from file status, and use the combination of path and modification time to generate cache identifier. When a file is changed, the modification time will be changed, so the former cache path will be invalid. --- be/src/io/cache/block/cached_remote_file_reader.cpp | 13 +++++++++---- be/src/io/cache/block/cached_remote_file_reader.h | 5 +++-- be/src/io/fs/file_reader_options.h | 2 ++ be/src/io/fs/remote_file_system.cpp | 6 ++++-- be/src/vec/exec/format/csv/csv_reader.cpp | 4 ++++ be/src/vec/exec/format/json/new_json_reader.cpp | 2 ++ be/src/vec/exec/format/orc/vorc_reader.cpp | 2 ++ be/src/vec/exec/format/parquet/vparquet_reader.cpp | 2 ++ .../doris/catalog/HiveMetaStoreClientHelper.java | 1 + .../org/apache/doris/common/util/BrokerUtil.java | 1 + .../doris/datasource/hive/HiveMetaStoreCache.java | 3 +++ .../apache/doris/fs/remote/BrokerFileSystem.java | 2 +- .../java/org/apache/doris/fs/remote/RemoteFile.java | 21 ++++++++++++++++++--- .../apache/doris/fs/remote/RemoteFileSystem.java | 4 ++-- .../org/apache/doris/fs/remote/S3FileSystem.java | 2 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 2 +- .../doris/planner/external/FileGroupInfo.java | 1 + .../doris/planner/external/FileQueryScanNode.java | 1 + .../apache/doris/planner/external/FileScanNode.java | 10 ++++++---- .../apache/doris/planner/external/FileSplit.java | 9 ++++++++- .../apache/doris/planner/external/HiveScanNode.java | 2 +- .../apache/doris/planner/external/TVFScanNode.java | 13 +------------ .../planner/external/iceberg/IcebergSplit.java | 1 + .../ExternalFileTableValuedFunction.java | 1 + .../apache/doris/broker/hdfs/FileSystemManager.java | 1 + gensrc/thrift/PaloBrokerService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 2 ++ 27 files changed, 80 insertions(+), 34 deletions(-) diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp b/be/src/io/cache/block/cached_remote_file_reader.cpp index f2c4c80173..81268ec5ad 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.cpp +++ b/be/src/io/cache/block/cached_remote_file_reader.cpp @@ -42,17 +42,22 @@ namespace doris { namespace io { CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, - const std::string& cache_path) + const std::string& cache_path, + const long modification_time) : _remote_file_reader(std::move(remote_file_reader)) { - _cache_key = IFileCache::hash(cache_path); + // Use path and modification time to build cache key + std::string unique_path = fmt::format("{}:{}", cache_path, modification_time); + _cache_key = IFileCache::hash(unique_path); _cache = FileCacheFactory::instance().get_by_path(_cache_key); } CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_base_path, - const std::string& cache_path) + const std::string& cache_path, + const long modification_time) : _remote_file_reader(std::move(remote_file_reader)) { - _cache_key = IFileCache::hash(cache_path); + std::string unique_path = fmt::format("{}:{}", cache_path, modification_time); + _cache_key = IFileCache::hash(unique_path); _cache = FileCacheFactory::instance().get_by_path(cache_base_path); if (_cache == nullptr) { LOG(WARNING) << "Can't get cache from base path: " << cache_base_path diff --git a/be/src/io/cache/block/cached_remote_file_reader.h b/be/src/io/cache/block/cached_remote_file_reader.h index 68b80e0ce4..5e66f9970a 100644 --- a/be/src/io/cache/block/cached_remote_file_reader.h +++ b/be/src/io/cache/block/cached_remote_file_reader.h @@ -39,10 +39,11 @@ struct FileCacheStatistics; class CachedRemoteFileReader final : public FileReader { public: - CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path); + CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_path, + const long modification_time); CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const std::string& cache_base_path, - const std::string& cache_path); + const std::string& cache_path, const long modification_time); ~CachedRemoteFileReader() override; diff --git a/be/src/io/fs/file_reader_options.h b/be/src/io/fs/file_reader_options.h index 66629f54de..744db5e1f6 100644 --- a/be/src/io/fs/file_reader_options.h +++ b/be/src/io/fs/file_reader_options.h @@ -80,6 +80,8 @@ public: int64_t file_size = -1; bool has_cache_base_path = false; std::string cache_base_path; + // Use modification time to determine whether the file is changed + int64_t modification_time = 0; void specify_cache_path(const std::string& base_path) { has_cache_base_path = true; diff --git a/be/src/io/fs/remote_file_system.cpp b/be/src/io/fs/remote_file_system.cpp index 63246a3c87..611f7cf894 100644 --- a/be/src/io/fs/remote_file_system.cpp +++ b/be/src/io/fs/remote_file_system.cpp @@ -97,9 +97,11 @@ Status RemoteFileSystem::open_file_impl(const Path& path, const FileReaderOption if (reader_options.has_cache_base_path) { // from query session variable: file_cache_base_path *reader = std::make_shared<CachedRemoteFileReader>( - std::move(raw_reader), reader_options.cache_base_path, cache_path); + std::move(raw_reader), reader_options.cache_base_path, cache_path, + reader_options.modification_time); } else { - *reader = std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path); + *reader = std::make_shared<CachedRemoteFileReader>(std::move(raw_reader), cache_path, + reader_options.modification_time); } break; } diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index cff15c88bb..7cb36e94c7 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -170,6 +170,8 @@ Status CsvReader::init_reader(bool is_load) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, @@ -657,6 +659,8 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _file_description.start_offset = start_offset; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, &_file_system, &_file_reader, reader_options)); if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 9643e48ab6..daa549776b 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -375,6 +375,8 @@ Status NewJsonReader::_open_file_reader() { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader)); } else { io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _range.__isset.modification_time ? _range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, io::DelegateReader::AccessMode::SEQUENTIAL, reader_options, _io_ctx, diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index ea36eec962..0f13b4d191 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -209,6 +209,8 @@ Status OrcReader::_create_file_reader() { if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &inner_reader, io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 40d3492229..a9bf8dc6c8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -209,6 +209,8 @@ Status ParquetReader::_open_file() { SCOPED_RAW_TIMER(&_statistics.open_file_time); ++_statistics.open_file_num; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state); + reader_options.modification_time = + _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; RETURN_IF_ERROR(io::DelegateReader::create_file_reader( _profile, _system_properties, _file_description, &_file_system, &_file_reader, io::DelegateReader::AccessMode::RANDOM, reader_options, _io_ctx)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 938f659159..184e61b1d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -233,6 +233,7 @@ public class HiveMetaStoreClientHelper { brokerFileStatus.setIsDir(fileLocation.isDirectory()); brokerFileStatus.setIsSplitable(true); brokerFileStatus.setSize(fileLocation.getSize()); + brokerFileStatus.setModificationTime(fileLocation.getModificationTime()); // filePath.toUri().getPath() = "/path/to/partition/file_name" // eg: /home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse // + /dae.db/customer/state=CA/city=SanJose/000000_0 diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 7fa0beda8d..0444abf443 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -102,6 +102,7 @@ public class BrokerUtil { if (r.isFile()) { TBrokerFileStatus status = new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile()); status.setBlockSize(r.getBlockSize()); + status.setModificationTime(r.getModificationTime()); fileStatuses.add(status); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 3bd7daade5..f6382b80da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -324,6 +324,7 @@ public class HiveMetaStoreCache { // Convert the hadoop split to Doris Split. for (int i = 0; i < splits.length; i++) { org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]); + // todo: get modification time result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null, null)); } } @@ -802,6 +803,7 @@ public class HiveMetaStoreCache { status.setPath(file.getPath()); status.length = file.getSize(); status.blockSize = file.getBlockSize(); + status.modificationTime = file.getModificationTime(); files.add(status); } @@ -823,6 +825,7 @@ public class HiveMetaStoreCache { Path path; long length; long blockSize; + long modificationTime; } @Data diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index 14befa5c16..a2eb5560a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -581,7 +581,7 @@ public class BrokerFileSystem extends RemoteFileSystem { List<TBrokerFileStatus> fileStatus = rep.getFiles(); for (TBrokerFileStatus tFile : fileStatus) { - RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0); + RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0, tFile.getModificationTime()); result.add(file); } LOG.info("finished to list remote path {}. get files: {}", remotePath, result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java index cdfcf451c3..a8d918cffa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java @@ -33,25 +33,36 @@ public class RemoteFile { // A large file will split into multiple blocks. The blocks are transparent to the user. // Default block size for HDFS 2.x is 128M. private final long blockSize; + private long modificationTime; private Path path; BlockLocation[] blockLocations; public RemoteFile(String name, boolean isFile, long size, long blockSize) { - this(name, null, isFile, !isFile, size, blockSize, null); + this(name, null, isFile, !isFile, size, blockSize, 0, null); + } + + public RemoteFile(String name, boolean isFile, long size, long blockSize, long modificationTime) { + this(name, null, isFile, !isFile, size, blockSize, modificationTime, null); } public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, BlockLocation[] blockLocations) { - this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, blockLocations); + this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 0, blockLocations); + } + + public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, long modificationTime, + BlockLocation[] blockLocations) { + this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, modificationTime, blockLocations); } public RemoteFile(String name, Path path, boolean isFile, boolean isDirectory, - long size, long blockSize, BlockLocation[] blockLocations) { + long size, long blockSize, long modificationTime, BlockLocation[] blockLocations) { Preconditions.checkState(!Strings.isNullOrEmpty(name)); this.name = name; this.isFile = isFile; this.isDirectory = isDirectory; this.size = size; this.blockSize = blockSize; + this.modificationTime = modificationTime; this.path = path; this.blockLocations = blockLocations; } @@ -80,6 +91,10 @@ public class RemoteFile { return blockSize; } + public long getModificationTime() { + return modificationTime; + } + public BlockLocation[] getBlockLocations() { return blockLocations; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 35710f8785..7d87993733 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -58,8 +58,8 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { List<RemoteFile> locations = new ArrayList<>(); while (locatedFiles.hasNext()) { LocatedFileStatus fileStatus = locatedFiles.next(); - RemoteFile location = new RemoteFile(fileStatus.getPath(), fileStatus.isDirectory(), - fileStatus.getLen(), fileStatus.getBlockSize(), fileStatus.getBlockLocations()); + RemoteFile location = new RemoteFile(fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(), + fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations()); locations.add(location); } return new RemoteFiles(locations); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index c6cfc0ba02..0d09037c81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -79,7 +79,7 @@ public class S3FileSystem extends ObjFileSystem { RemoteFile remoteFile = new RemoteFile( fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), - fileStatus.getBlockSize()); + fileStatus.getBlockSize(), fileStatus.getModificationTime()); result.add(remoteFile); } } catch (FileNotFoundException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 26478782a7..d8df27a5c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -431,7 +431,7 @@ public class DFSFileSystem extends RemoteFileSystem { RemoteFile remoteFile = new RemoteFile( fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), - fileStatus.getBlockSize()); + fileStatus.getBlockSize(), fileStatus.getModificationTime()); result.add(remoteFile); } } catch (FileNotFoundException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java index a1e3499382..7808c35ac6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -337,6 +337,7 @@ public class FileGroupInfo { rangeDesc.setSize(fileStatus.size); rangeDesc.setFileSize(fileStatus.size); } + rangeDesc.setModificationTime(fileStatus.getModificationTime()); return rangeDesc; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 86e3dab30f..a36369325e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -306,6 +306,7 @@ public abstract class FileQueryScanNode extends FileScanNode { // need full path rangeDesc.setPath(fileSplit.getPath().toString()); } + rangeDesc.setModificationTime(fileSplit.getModificationTime()); return rangeDesc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java index 3a24ba3319..aa0e923203 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java @@ -212,7 +212,7 @@ public class FileScanNode extends ExternalScanNode { } protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, - boolean splittable, List<String> partitionValues) throws IOException { + long modificationTime, boolean splittable, List<String> partitionValues) throws IOException { if (blockLocations == null) { blockLocations = new BlockLocation[0]; } @@ -226,7 +226,7 @@ public class FileScanNode extends ExternalScanNode { if (!splittable) { LOG.debug("Path {} is not splittable.", path); String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); - result.add(new FileSplit(path, 0, length, length, hosts, partitionValues)); + result.add(new FileSplit(path, 0, length, length, modificationTime, hosts, partitionValues)); return result; } long bytesRemaining; @@ -234,12 +234,14 @@ public class FileScanNode extends ExternalScanNode { bytesRemaining -= splitSize) { int location = getBlockIndex(blockLocations, length - bytesRemaining); String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(new FileSplit(path, length - bytesRemaining, splitSize, length, hosts, partitionValues)); + result.add(new FileSplit(path, length - bytesRemaining, splitSize, + length, modificationTime, hosts, partitionValues)); } if (bytesRemaining != 0L) { int location = getBlockIndex(blockLocations, length - bytesRemaining); String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, length, hosts, partitionValues)); + result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, + length, modificationTime, hosts, partitionValues)); } LOG.debug("Path {} includes {} splits.", path, result.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index 03f1ab3b60..21b88f9c2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -34,6 +34,7 @@ public class FileSplit implements Split { // -1 means unset. // If the file length is not set, the file length will be fetched from the file system. protected long fileLength; + protected long modificationTime; protected String[] hosts; protected TableFormatType tableFormatType; // The values of partitions. @@ -42,15 +43,21 @@ public class FileSplit implements Split { protected List<String> partitionValues; public FileSplit(Path path, long start, long length, long fileLength, - String[] hosts, List<String> partitionValues) { + long modificationTime, String[] hosts, List<String> partitionValues) { this.path = path; this.start = start; this.length = length; this.fileLength = fileLength; + this.modificationTime = modificationTime; this.hosts = hosts == null ? new String[0] : hosts; this.partitionValues = partitionValues; } + public FileSplit(Path path, long start, long length, long fileLength, + String[] hosts, List<String> partitionValues) { + this(path, start, length, fileLength, 0, hosts, partitionValues); + } + public String[] getHosts() { return hosts; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 96c1872010..87d0e55796 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -170,7 +170,7 @@ public class HiveScanNode extends FileQueryScanNode { boolean isSplittable = fileCacheValue.isSplittable(); for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(), - status.getBlockLocations(), status.getLength(), + status.getBlockLocations(), status.getLength(), status.getModificationTime(), isSplittable, fileCacheValue.getPartitionValues())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java index a8ede0d843..67008f5d7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -111,7 +111,7 @@ public class TVFScanNode extends FileQueryScanNode { Path path = new Path(fileStatus.getPath()); try { splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(), - fileStatus.isSplitable, null)); + fileStatus.getModificationTime(), fileStatus.isSplitable, null)); } catch (IOException e) { LOG.warn("get file split failed for TVF: {}", path, e); throw new UserException(e); @@ -119,15 +119,4 @@ public class TVFScanNode extends FileQueryScanNode { } return splits; } - - private void addFileSplits(Path path, long fileSize, long splitSize, List<Split> splits) { - long bytesRemaining; - for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D; - bytesRemaining -= splitSize) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null)); - } - if (bytesRemaining != 0L) { - splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null)); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index 896b4968b4..9064017088 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -26,6 +26,7 @@ import java.util.List; @Data public class IcebergSplit extends FileSplit { + // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts) { super(file, start, length, fileLength, hosts, null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 47adc717a5..74ee6a855d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -457,6 +457,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio fileRangeDesc.setStartOffset(0); fileRangeDesc.setSize(firstFile.getSize()); fileRangeDesc.setFileSize(firstFile.getSize()); + fileRangeDesc.setModificationTime(firstFile.getModificationTime()); // set TFileScanRange TFileScanRange fileScanRange = new TFileScanRange(); fileScanRange.addToRanges(fileRangeDesc); diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 17f4d13216..1b0b94b923 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -1036,6 +1036,7 @@ public class FileSystemManager { brokerFileStatus.setSize(fileStatus.getLen()); brokerFileStatus.setIsSplitable(true); } + brokerFileStatus.setModificationTime(fileStatus.getModificationTime()); if (fileNameOnly) { // return like this: file.txt brokerFileStatus.setPath(fileStatus.getPath().getName()); diff --git a/gensrc/thrift/PaloBrokerService.thrift b/gensrc/thrift/PaloBrokerService.thrift index 1d4d2e876a..308c606544 100644 --- a/gensrc/thrift/PaloBrokerService.thrift +++ b/gensrc/thrift/PaloBrokerService.thrift @@ -56,6 +56,7 @@ struct TBrokerFileStatus { //and the entire file must be imported as a complete map task. //the return value of the compressed file is false 5: optional i64 blockSize; //Block size in FS. e.g. HDFS and S3 + 6: optional i64 modificationTime = 0; // Last modification time } struct TBrokerFD { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 3b332744a9..b2ac960a10 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -348,6 +348,8 @@ struct TFileRangeDesc { 7: optional list<string> columns_from_path_keys; // For data lake table format 8: optional TTableFormatFileDesc table_format_params + // Use modification time to determine whether the file is changed + 9: optional i64 modification_time } // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org