This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 65a939cd574 [fix](file-writer) avoid empty file for segment writer (#31355) 65a939cd574 is described below commit 65a939cd5746cd18202f2fea80fcae7580b66bdc Author: Mingyu Chen <morning...@163.com> AuthorDate: Sun Feb 25 21:39:48 2024 +0800 [fix](file-writer) avoid empty file for segment writer (#31355) bp #31169 --- be/src/io/fs/broker_file_system.cpp | 2 +- be/src/io/fs/broker_file_writer.cpp | 9 ++++++--- be/src/io/fs/broker_file_writer.h | 2 +- be/src/io/fs/file_writer.h | 1 + be/src/io/fs/file_writer_options.h | 2 ++ be/src/io/fs/hdfs_file_system.cpp | 2 +- be/src/io/fs/hdfs_file_writer.cpp | 6 ++++-- be/src/io/fs/hdfs_file_writer.h | 2 +- be/src/io/fs/s3_file_system.cpp | 2 +- be/src/io/fs/s3_file_writer.cpp | 13 ++++++++----- be/src/io/fs/s3_file_writer.h | 2 +- be/src/olap/rowset/beta_rowset_writer.cpp | 3 ++- .../rowset/segment_v2/inverted_index_compound_directory.cpp | 3 ++- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 3 ++- be/src/olap/tablet.cpp | 3 ++- 15 files changed, 35 insertions(+), 20 deletions(-) diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index a3c93c04a7a..f0d6bc12380 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() { Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* writer, const FileWriterOptions* opts) { *writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), _broker_addr, _broker_prop, - path, 0 /* offset */, getSPtr()); + path, 0 /* offset */, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/broker_file_writer.cpp b/be/src/io/fs/broker_file_writer.cpp index daba1af2bce..a46d22e1505 100644 --- a/be/src/io/fs/broker_file_writer.cpp +++ b/be/src/io/fs/broker_file_writer.cpp @@ -37,12 +37,15 @@ namespace io { BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map<std::string, std::string>& properties, - const std::string& path, int64_t start_offset, FileSystemSPtr fs) + const std::string& path, int64_t start_offset, FileSystemSPtr fs, + const FileWriterOptions* opts) : FileWriter(path, fs), _env(env), _address(broker_address), _properties(properties), - _cur_offset(start_offset) {} + _cur_offset(start_offset) { + _create_empty_file = opts ? opts->create_empty_file : true; +} BrokerFileWriter::~BrokerFileWriter() { if (_opened) { @@ -159,7 +162,7 @@ Status BrokerFileWriter::finalize() { } Status BrokerFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/broker_file_writer.h b/be/src/io/fs/broker_file_writer.h index e3e53525679..3e8edab0078 100644 --- a/be/src/io/fs/broker_file_writer.h +++ b/be/src/io/fs/broker_file_writer.h @@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter { public: BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address, const std::map<std::string, std::string>& properties, const std::string& path, - int64_t start_offset, FileSystemSPtr fs); + int64_t start_offset, FileSystemSPtr fs, const FileWriterOptions* opts); virtual ~BrokerFileWriter(); Status open() override; diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 03f092c0424..1fd9b8391d9 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -67,6 +67,7 @@ protected: FileSystemSPtr _fs; bool _closed = false; bool _opened = false; + bool _create_empty_file = true; }; } // namespace io diff --git a/be/src/io/fs/file_writer_options.h b/be/src/io/fs/file_writer_options.h index 511bd81a168..4af38092373 100644 --- a/be/src/io/fs/file_writer_options.h +++ b/be/src/io/fs/file_writer_options.h @@ -26,6 +26,8 @@ struct FileWriterOptions { bool is_cold_data = false; bool sync_file_data = true; // Whether flush data into storage system int64_t file_cache_expiration = 0; // Absolute time + // Whether to create empty file if no content + bool create_empty_file = true; }; } // namespace io diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 6b44b219128..d3d54527836 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -167,7 +167,7 @@ Status HdfsFileSystem::connect_impl() { Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { - *writer = std::make_unique<HdfsFileWriter>(file, getSPtr()); + *writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index a5f5dab9fd4..fe4b6cde199 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -34,7 +34,9 @@ namespace doris { namespace io { -HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : FileWriter(std::move(file), fs) { +HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts) + : FileWriter(std::move(file), fs) { + _create_empty_file = opts ? opts->create_empty_file : true; _hdfs_fs = (HdfsFileSystem*)_fs.get(); } @@ -109,7 +111,7 @@ Status HdfsFileWriter::finalize() { } Status HdfsFileWriter::open() { - if (!_opened) { + if (_create_empty_file && !_opened) { RETURN_IF_ERROR(_open()); _opened = true; } diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 812598e7a51..e62c6d6367e 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -33,7 +33,7 @@ class HdfsFileSystem; class HdfsFileWriter : public FileWriter { public: - HdfsFileWriter(Path file, FileSystemSPtr fs); + HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* opts); ~HdfsFileWriter(); Status open() override; diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index a92539713e2..cad49b4555c 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -131,7 +131,7 @@ Status S3FileSystem::connect_impl() { Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer, const FileWriterOptions* opts) { GET_KEY(key, file); - *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, getSPtr()); + *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, getSPtr(), opts); return Status::OK(); } diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 78c8f9355c9..aa7dcb573ea 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -78,7 +78,7 @@ bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer", "file_created"); bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer", "file_being_written"); S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3Conf& s3_conf, - FileSystemSPtr fs) + FileSystemSPtr fs, const FileWriterOptions* opts) : FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, std::move(fs)), _bucket(s3_conf.bucket), _key(std::move(path)), @@ -87,6 +87,7 @@ S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const S3 s3_file_writer_total << 1; s3_file_being_written << 1; + _create_empty_file = opts ? opts->create_empty_file : true; Aws::Http::SetCompliantRfc3986Encoding(true); } @@ -195,7 +196,7 @@ Status S3FileWriter::close() { // it might be one file less than 5MB, we do upload here _pending_buf->set_upload_remote_callback( [this, buf = _pending_buf]() { _put_object(*buf); }); - } else { + } else if (_create_empty_file) { // if there is no pending buffer, we need to create an empty file _pending_buf = S3FileBufferPool::GetInstance()->allocate(); // if there is no upload id, we need to create a new one @@ -211,9 +212,11 @@ Status S3FileWriter::close() { }); } } - _countdown_event.add_count(); - _pending_buf->submit(); - _pending_buf = nullptr; + if (_pending_buf != nullptr) { + _countdown_event.add_count(); + _pending_buf->submit(); + _pending_buf = nullptr; + } RETURN_IF_ERROR(_complete()); diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h index 2c139242ed4..ab4c1f9f47c 100644 --- a/be/src/io/fs/s3_file_writer.h +++ b/be/src/io/fs/s3_file_writer.h @@ -46,7 +46,7 @@ struct S3FileBuffer; class S3FileWriter final : public FileWriter { public: S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf, - FileSystemSPtr fs); + FileSystemSPtr fs, const FileWriterOptions* opts); ~S3FileWriter() override; Status close() override; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index c90cd6ba079..2b47b3aaed8 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -706,7 +706,8 @@ Status BetaRowsetWriter::_do_create_segment_writer( return Status::Error<INIT_FAILED>("get fs failed"); } io::FileWriterPtr file_writer; - Status st = fs->create_file(path, &file_writer); + io::FileWriterOptions opts {.create_empty_file = false}; + Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; return st; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index 1af26a57674..e7f8f6abe26 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -596,7 +596,8 @@ void DorisCompoundDirectory::touchFile(const char* name) { snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, name); io::FileWriterPtr tmp_writer; - LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer), "Touch file IO error") + io::FileWriterOptions opts {.create_empty_file = false}; + LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer, &opts), "Touch file IO error") } int64_t DorisCompoundDirectory::fileLength(const char* name) const { diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 10cef8e1675..33d638b63f3 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -182,7 +182,8 @@ Status VerticalBetaRowsetWriter::_create_segment_writer( return Status::Error<INIT_FAILED>("get fs failed"); } io::FileWriterPtr file_writer; - Status st = fs->create_file(path, &file_writer); + io::FileWriterOptions opts {.create_empty_file = false}; + Status st = fs->create_file(path, &file_writer, &opts); if (!st.ok()) { LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; return st; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 61fe618f032..47e089add41 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2267,8 +2267,9 @@ Status Tablet::write_cooldown_meta() { std::string remote_meta_path = remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, _cooldown_term); io::FileWriterPtr tablet_meta_writer; + io::FileWriterOptions opts {.create_empty_file = false}; // FIXME(plat1ko): What if object store permanently unavailable? - RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer)); + RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer, &opts)); auto val = tablet_meta_pb.SerializeAsString(); RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()})); return tablet_meta_writer->close(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org