This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 65a939cd574 [fix](file-writer) avoid empty file for segment writer 
(#31355)
65a939cd574 is described below

commit 65a939cd5746cd18202f2fea80fcae7580b66bdc
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sun Feb 25 21:39:48 2024 +0800

    [fix](file-writer) avoid empty file for segment writer (#31355)
    
    bp #31169
---
 be/src/io/fs/broker_file_system.cpp                         |  2 +-
 be/src/io/fs/broker_file_writer.cpp                         |  9 ++++++---
 be/src/io/fs/broker_file_writer.h                           |  2 +-
 be/src/io/fs/file_writer.h                                  |  1 +
 be/src/io/fs/file_writer_options.h                          |  2 ++
 be/src/io/fs/hdfs_file_system.cpp                           |  2 +-
 be/src/io/fs/hdfs_file_writer.cpp                           |  6 ++++--
 be/src/io/fs/hdfs_file_writer.h                             |  2 +-
 be/src/io/fs/s3_file_system.cpp                             |  2 +-
 be/src/io/fs/s3_file_writer.cpp                             | 13 ++++++++-----
 be/src/io/fs/s3_file_writer.h                               |  2 +-
 be/src/olap/rowset/beta_rowset_writer.cpp                   |  3 ++-
 .../rowset/segment_v2/inverted_index_compound_directory.cpp |  3 ++-
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp          |  3 ++-
 be/src/olap/tablet.cpp                                      |  3 ++-
 15 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/be/src/io/fs/broker_file_system.cpp 
b/be/src/io/fs/broker_file_system.cpp
index a3c93c04a7a..f0d6bc12380 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
 Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr* 
writer,
                                           const FileWriterOptions* opts) {
     *writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(), 
_broker_addr, _broker_prop,
-                                                 path, 0 /* offset */, 
getSPtr());
+                                                 path, 0 /* offset */, 
getSPtr(), opts);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/broker_file_writer.cpp 
b/be/src/io/fs/broker_file_writer.cpp
index daba1af2bce..a46d22e1505 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -37,12 +37,15 @@ namespace io {
 
 BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress& 
broker_address,
                                    const std::map<std::string, std::string>& 
properties,
-                                   const std::string& path, int64_t 
start_offset, FileSystemSPtr fs)
+                                   const std::string& path, int64_t 
start_offset, FileSystemSPtr fs,
+                                   const FileWriterOptions* opts)
         : FileWriter(path, fs),
           _env(env),
           _address(broker_address),
           _properties(properties),
-          _cur_offset(start_offset) {}
+          _cur_offset(start_offset) {
+    _create_empty_file = opts ? opts->create_empty_file : true;
+}
 
 BrokerFileWriter::~BrokerFileWriter() {
     if (_opened) {
@@ -159,7 +162,7 @@ Status BrokerFileWriter::finalize() {
 }
 
 Status BrokerFileWriter::open() {
-    if (!_opened) {
+    if (_create_empty_file && !_opened) {
         RETURN_IF_ERROR(_open());
         _opened = true;
     }
diff --git a/be/src/io/fs/broker_file_writer.h 
b/be/src/io/fs/broker_file_writer.h
index e3e53525679..3e8edab0078 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter {
 public:
     BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
                      const std::map<std::string, std::string>& properties, 
const std::string& path,
-                     int64_t start_offset, FileSystemSPtr fs);
+                     int64_t start_offset, FileSystemSPtr fs, const 
FileWriterOptions* opts);
     virtual ~BrokerFileWriter();
 
     Status open() override;
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 03f092c0424..1fd9b8391d9 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -67,6 +67,7 @@ protected:
     FileSystemSPtr _fs;
     bool _closed = false;
     bool _opened = false;
+    bool _create_empty_file = true;
 };
 
 } // namespace io
diff --git a/be/src/io/fs/file_writer_options.h 
b/be/src/io/fs/file_writer_options.h
index 511bd81a168..4af38092373 100644
--- a/be/src/io/fs/file_writer_options.h
+++ b/be/src/io/fs/file_writer_options.h
@@ -26,6 +26,8 @@ struct FileWriterOptions {
     bool is_cold_data = false;
     bool sync_file_data = true;        // Whether flush data into storage 
system
     int64_t file_cache_expiration = 0; // Absolute time
+    // Whether to create empty file if no content
+    bool create_empty_file = true;
 };
 
 } // namespace io
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 6b44b219128..d3d54527836 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -167,7 +167,7 @@ Status HdfsFileSystem::connect_impl() {
 
 Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr* 
writer,
                                         const FileWriterOptions* opts) {
-    *writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
+    *writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index a5f5dab9fd4..fe4b6cde199 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -34,7 +34,9 @@
 namespace doris {
 namespace io {
 
-HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) : 
FileWriter(std::move(file), fs) {
+HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const 
FileWriterOptions* opts)
+        : FileWriter(std::move(file), fs) {
+    _create_empty_file = opts ? opts->create_empty_file : true;
     _hdfs_fs = (HdfsFileSystem*)_fs.get();
 }
 
@@ -109,7 +111,7 @@ Status HdfsFileWriter::finalize() {
 }
 
 Status HdfsFileWriter::open() {
-    if (!_opened) {
+    if (_create_empty_file && !_opened) {
         RETURN_IF_ERROR(_open());
         _opened = true;
     }
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 812598e7a51..e62c6d6367e 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -33,7 +33,7 @@ class HdfsFileSystem;
 
 class HdfsFileWriter : public FileWriter {
 public:
-    HdfsFileWriter(Path file, FileSystemSPtr fs);
+    HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions* 
opts);
     ~HdfsFileWriter();
 
     Status open() override;
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index a92539713e2..cad49b4555c 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -131,7 +131,7 @@ Status S3FileSystem::connect_impl() {
 Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
                                       const FileWriterOptions* opts) {
     GET_KEY(key, file);
-    *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, 
getSPtr());
+    *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, 
getSPtr(), opts);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 78c8f9355c9..aa7dcb573ea 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -78,7 +78,7 @@ bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer", 
"file_created");
 bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer", 
"file_being_written");
 
 S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const 
S3Conf& s3_conf,
-                           FileSystemSPtr fs)
+                           FileSystemSPtr fs, const FileWriterOptions* opts)
         : FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path, 
std::move(fs)),
           _bucket(s3_conf.bucket),
           _key(std::move(path)),
@@ -87,6 +87,7 @@ S3FileWriter::S3FileWriter(Path path, 
std::shared_ptr<S3Client> client, const S3
     s3_file_writer_total << 1;
     s3_file_being_written << 1;
 
+    _create_empty_file = opts ? opts->create_empty_file : true;
     Aws::Http::SetCompliantRfc3986Encoding(true);
 }
 
@@ -195,7 +196,7 @@ Status S3FileWriter::close() {
             // it might be one file less than 5MB, we do upload here
             _pending_buf->set_upload_remote_callback(
                     [this, buf = _pending_buf]() { _put_object(*buf); });
-        } else {
+        } else if (_create_empty_file) {
             // if there is no pending buffer, we need to create an empty file
             _pending_buf = S3FileBufferPool::GetInstance()->allocate();
             // if there is no upload id, we need to create a new one
@@ -211,9 +212,11 @@ Status S3FileWriter::close() {
             });
         }
     }
-    _countdown_event.add_count();
-    _pending_buf->submit();
-    _pending_buf = nullptr;
+    if (_pending_buf != nullptr) {
+        _countdown_event.add_count();
+        _pending_buf->submit();
+        _pending_buf = nullptr;
+    }
 
     RETURN_IF_ERROR(_complete());
 
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 2c139242ed4..ab4c1f9f47c 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -46,7 +46,7 @@ struct S3FileBuffer;
 class S3FileWriter final : public FileWriter {
 public:
     S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const 
S3Conf& s3_conf,
-                 FileSystemSPtr fs);
+                 FileSystemSPtr fs, const FileWriterOptions* opts);
     ~S3FileWriter() override;
 
     Status close() override;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index c90cd6ba079..2b47b3aaed8 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -706,7 +706,8 @@ Status BetaRowsetWriter::_do_create_segment_writer(
         return Status::Error<INIT_FAILED>("get fs failed");
     }
     io::FileWriterPtr file_writer;
-    Status st = fs->create_file(path, &file_writer);
+    io::FileWriterOptions opts {.create_empty_file = false};
+    Status st = fs->create_file(path, &file_writer, &opts);
     if (!st.ok()) {
         LOG(WARNING) << "failed to create writable file. path=" << path << ", 
err: " << st;
         return st;
diff --git 
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index 1af26a57674..e7f8f6abe26 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -596,7 +596,8 @@ void DorisCompoundDirectory::touchFile(const char* name) {
     snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, 
name);
 
     io::FileWriterPtr tmp_writer;
-    LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer), "Touch file 
IO error")
+    io::FileWriterOptions opts {.create_empty_file = false};
+    LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer, &opts), "Touch 
file IO error")
 }
 
 int64_t DorisCompoundDirectory::fileLength(const char* name) const {
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp 
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 10cef8e1675..33d638b63f3 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -182,7 +182,8 @@ Status VerticalBetaRowsetWriter::_create_segment_writer(
         return Status::Error<INIT_FAILED>("get fs failed");
     }
     io::FileWriterPtr file_writer;
-    Status st = fs->create_file(path, &file_writer);
+    io::FileWriterOptions opts {.create_empty_file = false};
+    Status st = fs->create_file(path, &file_writer, &opts);
     if (!st.ok()) {
         LOG(WARNING) << "failed to create writable file. path=" << path << ", 
err: " << st;
         return st;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 61fe618f032..47e089add41 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2267,8 +2267,9 @@ Status Tablet::write_cooldown_meta() {
     std::string remote_meta_path =
             remote_tablet_meta_path(tablet_id(), _cooldown_replica_id, 
_cooldown_term);
     io::FileWriterPtr tablet_meta_writer;
+    io::FileWriterOptions opts {.create_empty_file = false};
     // FIXME(plat1ko): What if object store permanently unavailable?
-    RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
+    RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer, 
&opts));
     auto val = tablet_meta_pb.SerializeAsString();
     RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()}));
     return tablet_meta_writer->close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to