This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 3c81bca5684 [opt](filecache) do not sync segment data into storage system #25691 (#25856) 3c81bca5684 is described below commit 3c81bca5684ba9ad1f4c914f61b32f3c7bdf19e4 Author: Mingyu Chen <morning...@163.com> AuthorDate: Wed Oct 25 16:49:11 2023 +0800 [opt](filecache) do not sync segment data into storage system #25691 (#25856) --- be/src/exec/tablet_info.cpp | 10 +++---- be/src/io/cache/block/block_file_segment.cpp | 3 +- be/src/io/fs/broker_file_system.cpp | 7 +++-- be/src/io/fs/broker_file_system.h | 3 +- be/src/io/fs/file_system.cpp | 5 ++-- be/src/io/fs/file_system.h | 7 +++-- .../{local_file_writer.h => file_writer_options.h} | 32 ++++------------------ be/src/io/fs/hdfs_file_system.cpp | 5 ++-- be/src/io/fs/hdfs_file_system.h | 3 +- be/src/io/fs/local_file_system.cpp | 7 +++-- be/src/io/fs/local_file_system.h | 3 +- be/src/io/fs/local_file_writer.cpp | 6 ++-- be/src/io/fs/local_file_writer.h | 3 +- be/src/io/fs/s3_file_system.cpp | 3 +- be/src/io/fs/s3_file_system.h | 3 +- be/test/olap/tablet_cooldown_test.cpp | 3 +- 16 files changed, 50 insertions(+), 53 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index fb5719bed4b..8df90973b05 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -93,8 +93,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { if (!_is_partial_update || _partial_update_input_columns.count(pcolumn_desc.name()) > 0) { std::string col_type = has_invalid_type ? "INVALID_TYPE" : pcolumn_desc.type(); - auto it = slots_map.find( - std::make_pair(to_lower(pcolumn_desc.name()), col_type)); + auto it = slots_map.find(std::make_pair(to_lower(pcolumn_desc.name()), col_type)); if (it == std::end(slots_map)) { return Status::InternalError("unknown index column, column={}, type={}", pcolumn_desc.name(), pcolumn_desc.type()); @@ -153,9 +152,10 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { index->index_id = t_index.id; index->schema_hash = t_index.schema_hash; for (auto& tcolumn_desc : t_index.columns_desc) { - TPrimitiveType::type col_type = has_invalid_type ? TPrimitiveType::INVALID_TYPE : tcolumn_desc.column_type.type; - auto it = slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name), - thrift_to_type(col_type))); + TPrimitiveType::type col_type = + has_invalid_type ? TPrimitiveType::INVALID_TYPE : tcolumn_desc.column_type.type; + auto it = slots_map.find( + std::make_pair(to_lower(tcolumn_desc.column_name), thrift_to_type(col_type))); if (!_is_partial_update || _partial_update_input_columns.count(tcolumn_desc.column_name) > 0) { if (it == slots_map.end()) { diff --git a/be/src/io/cache/block/block_file_segment.cpp b/be/src/io/cache/block/block_file_segment.cpp index 38d230d9bb8..3b3ac6a5eb3 100644 --- a/be/src/io/cache/block/block_file_segment.cpp +++ b/be/src/io/cache/block/block_file_segment.cpp @@ -159,7 +159,8 @@ Status FileBlock::append(Slice data) { Status st = Status::OK(); if (!_cache_writer) { auto download_path = get_path_in_local_cache(); - st = global_local_filesystem()->create_file(download_path, &_cache_writer); + FileWriterOptions not_sync {.sync_file_data = false}; + st = global_local_filesystem()->create_file(download_path, &_cache_writer, ¬_sync); if (!st) { _cache_writer.reset(); return st; diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 5a4342027b1..3578fb91eaa 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -95,7 +95,8 @@ Status BrokerFileSystem::connect_impl() { return status; } -Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer) { +Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, + const FileWriterOptions* opts) { *writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), _broker_addr, _broker_prop, path, 0 /* offset */, getSPtr()); return Status::OK(); @@ -356,7 +357,7 @@ Status BrokerFileSystem::upload_impl(const Path& local_file, const Path& remote_ // NOTICE: broker writer must be closed before calling rename FileWriterPtr broker_writer = nullptr; - RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer)); + RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr)); constexpr size_t buf_sz = 1024 * 1024; char read_buf[buf_sz]; @@ -391,7 +392,7 @@ Status BrokerFileSystem::batch_upload_impl(const std::vector<Path>& local_files, Status BrokerFileSystem::direct_upload_impl(const Path& remote_file, const std::string& content) { FileWriterPtr broker_writer = nullptr; - RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer)); + RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr)); RETURN_IF_ERROR(broker_writer->append({content})); return broker_writer->close(); } diff --git a/be/src/io/fs/broker_file_system.h b/be/src/io/fs/broker_file_system.h index a015f5c1f53..1e29b10a744 100644 --- a/be/src/io/fs/broker_file_system.h +++ b/be/src/io/fs/broker_file_system.h @@ -48,7 +48,8 @@ public: protected: Status connect_impl() override; - Status create_file_impl(const Path& file, FileWriterPtr* writer) override; + Status create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) override; Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp index 989a68884a6..c5d49d83f51 100644 --- a/be/src/io/fs/file_system.cpp +++ b/be/src/io/fs/file_system.cpp @@ -22,9 +22,10 @@ namespace doris { namespace io { -Status FileSystem::create_file(const Path& file, FileWriterPtr* writer) { +Status FileSystem::create_file(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) { auto path = absolute_path(file); - FILESYSTEM_M(create_file_impl(path, writer)); + FILESYSTEM_M(create_file_impl(path, writer, opts)); } Status FileSystem::open_file(const FileDescription& fd, const FileReaderOptions& reader_options, diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index 04e2615fb0b..4a0ee22e08a 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -32,6 +32,7 @@ #include "common/status.h" #include "io/fs/file_reader_options.h" #include "io/fs/file_reader_writer_fwd.h" +#include "io/fs/file_writer_options.h" #include "io/fs/fs_utils.h" #include "io/fs/path.h" @@ -74,7 +75,8 @@ class FileSystem : public std::enable_shared_from_this<FileSystem> { public: // The following are public interface. // And derived classes should implement all xxx_impl methods. - Status create_file(const Path& file, FileWriterPtr* writer); + Status create_file(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts = nullptr); Status open_file(const Path& file, FileReaderSPtr* reader) { FileDescription fd; fd.path = file.native(); @@ -115,7 +117,8 @@ public: protected: /// create file and return a FileWriter - virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) = 0; + virtual Status create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) = 0; /// open file and return a FileReader virtual Status open_file_impl(const FileDescription& fd, const Path& abs_file, diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/file_writer_options.h similarity index 57% copy from be/src/io/fs/local_file_writer.h copy to be/src/io/fs/file_writer_options.h index 11b2f16434b..511bd81a168 100644 --- a/be/src/io/fs/local_file_writer.h +++ b/be/src/io/fs/file_writer_options.h @@ -17,35 +17,15 @@ #pragma once -#include <cstddef> - -#include "common/status.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "io/fs/path.h" -#include "util/slice.h" - namespace doris { namespace io { -class LocalFileWriter final : public FileWriter { -public: - LocalFileWriter(Path path, int fd, FileSystemSPtr fs); - LocalFileWriter(Path path, int fd); - ~LocalFileWriter() override; - - Status close() override; - Status abort() override; - Status appendv(const Slice* data, size_t data_cnt) override; - Status write_at(size_t offset, const Slice& data) override; - Status finalize() override; - -private: - Status _close(bool sync); - -private: - int _fd; // owned - bool _dirty = false; +// Only affects remote file writers +struct FileWriterOptions { + bool write_file_cache = false; + bool is_cold_data = false; + bool sync_file_data = true; // Whether flush data into storage system + int64_t file_cache_expiration = 0; // Absolute time }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 22a1516d07d..b8da1d24e61 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -164,7 +164,8 @@ Status HdfsFileSystem::connect_impl() { return Status::OK(); } -Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) { +Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) { *writer = std::make_unique<HdfsFileWriter>(file, getSPtr()); return Status::OK(); } @@ -315,7 +316,7 @@ Status HdfsFileSystem::upload_impl(const Path& local_file, const Path& remote_fi // 2. open remote file for write FileWriterPtr hdfs_writer = nullptr; - RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer)); + RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer, nullptr)); constexpr size_t buf_sz = 1024 * 1024; char read_buf[buf_sz]; diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index 229d236195a..f0a73ebed93 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -122,7 +122,8 @@ public: protected: Status connect_impl() override; - Status create_file_impl(const Path& file, FileWriterPtr* writer) override; + Status create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) override; Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 7e9f88c8b21..2574cec3ef4 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -55,13 +55,16 @@ LocalFileSystem::LocalFileSystem(Path&& root_path, std::string&& id) LocalFileSystem::~LocalFileSystem() = default; -Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) { +Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) { int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC, 0666); if (-1 == fd) { return Status::IOError("failed to open {}: {}", file.native(), errno_to_str()); } + bool sync_data = opts != nullptr ? opts->sync_file_data : true; *writer = std::make_unique<LocalFileWriter>( - std::move(file), fd, std::static_pointer_cast<LocalFileSystem>(shared_from_this())); + std::move(file), fd, std::static_pointer_cast<LocalFileSystem>(shared_from_this()), + sync_data); return Status::OK(); } diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 5ede7b9ab91..da55cf0dfba 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -82,7 +82,8 @@ public: Status safe_glob(const std::string& path, std::vector<FileInfo>* res); protected: - Status create_file_impl(const Path& file, FileWriterPtr* writer) override; + Status create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) override; Status open_file_impl(const FileDescription& file_desc, const Path& abs_path, const FileReaderOptions& reader_options, FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; diff --git a/be/src/io/fs/local_file_writer.cpp b/be/src/io/fs/local_file_writer.cpp index 62536dd6876..893c6ea9034 100644 --- a/be/src/io/fs/local_file_writer.cpp +++ b/be/src/io/fs/local_file_writer.cpp @@ -67,8 +67,8 @@ Status sync_dir(const io::Path& dirname) { namespace io { -LocalFileWriter::LocalFileWriter(Path path, int fd, FileSystemSPtr fs) - : FileWriter(std::move(path), fs), _fd(fd) { +LocalFileWriter::LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data) + : FileWriter(std::move(path), fs), _fd(fd), _sync_data(sync_data) { _opened = true; DorisMetrics::instance()->local_file_open_writing->increment(1); DorisMetrics::instance()->local_file_writer_total->increment(1); @@ -85,7 +85,7 @@ LocalFileWriter::~LocalFileWriter() { } Status LocalFileWriter::close() { - return _close(true); + return _close(_sync_data); } Status LocalFileWriter::abort() { diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h index 11b2f16434b..59329b178c5 100644 --- a/be/src/io/fs/local_file_writer.h +++ b/be/src/io/fs/local_file_writer.h @@ -30,7 +30,7 @@ namespace io { class LocalFileWriter final : public FileWriter { public: - LocalFileWriter(Path path, int fd, FileSystemSPtr fs); + LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data = true); LocalFileWriter(Path path, int fd); ~LocalFileWriter() override; @@ -46,6 +46,7 @@ private: private: int _fd; // owned bool _dirty = false; + const bool _sync_data; }; } // namespace io diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index ca4fd0bda8b..79f2a324f44 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -128,7 +128,8 @@ Status S3FileSystem::connect_impl() { return Status::OK(); } -Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) { +Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) { GET_KEY(key, file); *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, getSPtr()); return Status::OK(); diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index d2570a10588..301d97c73cd 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -68,7 +68,8 @@ public: protected: Status connect_impl() override; - Status create_file_impl(const Path& file, FileWriterPtr* writer) override; + Status create_file_impl(const Path& file, FileWriterPtr* writer, + const FileWriterOptions* opts) override; Status open_file_internal(const FileDescription& fd, const Path& abs_path, FileReaderSPtr* reader) override; Status create_directory_impl(const Path& dir, bool failed_if_exists = false) override; diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp index c87a09899e1..2582746291a 100644 --- a/be/test/olap/tablet_cooldown_test.cpp +++ b/be/test/olap/tablet_cooldown_test.cpp @@ -135,7 +135,8 @@ public: ~RemoteFileSystemMock() override = default; protected: - Status create_file_impl(const Path& path, io::FileWriterPtr* writer) override { + Status create_file_impl(const Path& path, io::FileWriterPtr* writer, + const io::FileWriterOptions* opts) override { Path fs_path = path; *writer = std::make_unique<FileWriterMock>(fs_path); return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org