This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new be06ef4b0eb [fix](file-writer) avoid empty file for segment writer (#31169) be06ef4b0eb is described below commit be06ef4b0ebceb40a6c8888dafba1f26e38c7a5f Author: Mingyu Chen <morning...@163.com> AuthorDate: Wed Feb 21 09:00:31 2024 +0800 [fix](file-writer) avoid empty file for segment writer (#31169) --- be/src/io/fs/broker_file_system.cpp | 2 +- be/src/io/fs/broker_file_writer.cpp | 9 ++++++--- be/src/io/fs/broker_file_writer.h | 2 +- be/src/io/fs/file_writer.h | 3 +++ be/src/io/fs/hdfs_file_system.cpp | 4 ++-- be/src/io/fs/hdfs_file_writer.cpp | 6 ++++-- be/src/io/fs/hdfs_file_writer.h | 2 +- be/src/io/fs/s3_file_writer.cpp | 12 ++++++++---- be/src/olap/rowset/beta_rowset_writer.cpp | 2 +- 9 files changed, 27 insertions(+), 15 deletions(-) diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 576a7dd8959..44582a0ff8d 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() { Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, const FileWriterOptions* opts) { *writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), _broker_addr, _broker_prop, - path, 0 /* offset */, getSPtr()); + path, 0 /* offset */, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index 0d305bf269b..75acf40084c 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -37,12 +37,15 @@ namespace io { BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map<std::string, std::string>& properties, - const std::string& path, int64_t start_offset, FileSystemSPtr fs) + const std::string& path, int64_t start_offset, FileSystemSPtr fs, + const FileWriterOptions* opts) : FileWriter(path, fs), _env(env), _address(broker_address), _properties(properties), - _cur_offset(start_offset) {} + _cur_offset(start_offset) { + _create_empty_file = opts ? opts->create_empty_file : true; +} BrokerFileWriter::~BrokerFileWriter() { if (_opened) { @@ -154,7 +157,7 @@ Status BrokerFileWriter::finalize() { } Status BrokerFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index f132545f0a8..05b62846e6c 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter { public: BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map<std::string, std::string>& properties, const std::string& path, - int64_t start_offset, FileSystemSPtr fs); + int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts); virtual ~BrokerFileWriter(); Status open() override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 256c67a9838..bb3235e7d27 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -35,6 +35,8 @@ struct FileWriterOptions { bool is_cold_data = false; bool sync_file_data = true; // Whether flush data into storage system int64_t file_cache_expiration = 0; // Absolute time + // Whether to create empty file if no content + bool create_empty_file = true; }; class FileWriter { @@ -77,6 +79,7 @@ protected: std::shared_ptr<FileSystem> _fs; bool _closed = false; bool _opened = false; + bool _create_empty_file = true; }; using FileWriterPtr = std::unique_ptr<FileWriter>; diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 8ada4b92acc..a65784226f3 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -167,8 +167,8 @@ Status HdfsFileSystem::connect_impl() { } Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, - const FileWriterOptions*) { - *writer = std::make_unique<HdfsFileWriter>(file, getSPtr()); + const FileWriterOptions* opts) { + *writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 1f262e1abcd..40c3c59dcd7 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -34,7 +34,9 @@ namespace doris { namespace io { -HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) { +HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts) + : FileWriter(std::move(file), fs) { + _create_empty_file = opts ? opts->create_empty_file : true; _hdfs_fs = (HdfsFileSystem*)_fs.get(); } @@ -104,7 +106,7 @@ Status HdfsFileWriter::finalize() { } Status HdfsFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index bffd0efdca9..21dcaff1cd6 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -33,7 +33,7 @@ class HdfsFileSystem; class HdfsFileWriter : public FileWriter { public: - HdfsFileWriter(Path file, FileSystemSPtr fs); + HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts); ~HdfsFileWriter(); Status open() override; diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 7711529b6f5..dbe5ce8e70c 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -98,6 +98,8 @@ S3FileWriter::S3FileWriter(std::string key, std::shared_ptr<S3FileSystem> fs, _cache_key = IFileCache::hash(_path.filename().native()); _cache = FileCacheFactory::instance()->get_by_path(_cache_key); } + + _create_empty_file = opts ? opts->create_empty_file : true; } S3FileWriter::~S3FileWriter() { @@ -205,7 +207,7 @@ Status S3FileWriter::close() { auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get()); DCHECK(buf != nullptr); buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); }); - } else { + } else if (_create_empty_file) { // if there is no pending buffer, we need to create an empty file auto builder = FileBufferBuilder(); builder.set_type(BufferType::UPLOAD) @@ -232,9 +234,11 @@ Status S3FileWriter::close() { DCHECK(buf != nullptr); } } - _countdown_event.add_count(); - RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); - _pending_buf = nullptr; + if (_pending_buf != nullptr) { + _countdown_event.add_count(); + RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf))); + _pending_buf = nullptr; + } DBUG_EXECUTE_IF("s3_file_writer::close", { RETURN_IF_ERROR(_complete()); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index b11a236ebf2..8556e19483f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -701,7 +701,7 @@ Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWrite _context.file_cache_ttl_sec > 0 && _context.newest_write_timestamp > 0 ? _context.newest_write_timestamp + _context.file_cache_ttl_sec : 0, - }; + .create_empty_file = false}; Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org