This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new eab8876abc [Feature](remote) Using heavy schema change if the table is not enable light weight schema change (#13487) eab8876abc is described below commit eab8876abc536cd338c1c7c080b9f92dee8a6d64 Author: pengxiangyu <diablo...@163.com> AuthorDate: Fri Oct 28 15:48:22 2022 +0800 [Feature](remote) Using heavy schema change if the table is not enable light weight schema change (#13487) --- be/src/io/CMakeLists.txt | 1 + be/src/io/fs/file_system.h | 2 +- be/src/io/fs/file_system_map.cpp | 4 +- be/src/io/fs/file_system_map.h | 6 +- be/src/io/fs/local_file_system.cpp | 7 +- be/src/io/fs/local_file_system.h | 2 +- be/src/io/fs/s3_file_system.cpp | 5 +- be/src/io/fs/s3_file_system.h | 1 - be/src/io/fs/s3_file_writer.cpp | 243 +++++++++++++++++++++ be/src/io/fs/s3_file_writer.h | 79 +++++++ be/src/olap/data_dir.h | 4 +- be/src/olap/rowset/beta_rowset.cpp | 55 +++-- be/src/olap/rowset/beta_rowset.h | 7 +- be/src/olap/rowset/beta_rowset_writer.cpp | 20 +- be/src/olap/rowset/rowset.h | 3 + be/src/olap/rowset/rowset_meta.h | 8 +- be/src/olap/rowset/rowset_writer_context.h | 6 +- be/src/olap/rowset/segment_v2/segment.cpp | 2 +- be/src/olap/rowset/segment_v2/segment.h | 6 +- be/src/olap/schema_change.cpp | 14 +- be/src/olap/snapshot_manager.cpp | 2 +- be/src/olap/tablet.cpp | 17 +- be/src/olap/tablet.h | 6 + be/src/olap/tablet_meta.cpp | 2 +- be/src/olap/tablet_meta.h | 2 +- be/src/util/doris_metrics.cpp | 8 + be/src/util/doris_metrics.h | 4 + be/test/olap/rowid_conversion_test.cpp | 2 +- be/test/olap/rowset/beta_rowset_test.cpp | 2 +- .../olap/rowset/segment_v2/bitmap_index_test.cpp | 5 +- 30 files changed, 457 insertions(+), 68 deletions(-) diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt index 0941505ba0..4096d2557b 100644 --- a/be/src/io/CMakeLists.txt +++ b/be/src/io/CMakeLists.txt @@ -40,6 +40,7 @@ set(IO_FILES fs/local_file_writer.cpp fs/s3_file_reader.cpp fs/s3_file_system.cpp + fs/s3_file_writer.cpp cache/dummy_file_cache.cpp cache/file_cache.cpp cache/file_cache_manager.cpp diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index e3c4a19018..e7d4fbbb88 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -80,7 +80,7 @@ protected: FileSystemType _type; }; -using FileSystemPtr = std::shared_ptr<FileSystem>; +using FileSystemSPtr = std::shared_ptr<FileSystem>; } // namespace io } // namespace doris diff --git a/be/src/io/fs/file_system_map.cpp b/be/src/io/fs/file_system_map.cpp index 2467781e53..18d7f454e8 100644 --- a/be/src/io/fs/file_system_map.cpp +++ b/be/src/io/fs/file_system_map.cpp @@ -27,12 +27,12 @@ FileSystemMap* FileSystemMap::instance() { return ↦ } -void FileSystemMap::insert(ResourceId id, FileSystemPtr fs) { +void FileSystemMap::insert(ResourceId id, FileSystemSPtr fs) { std::unique_lock wlock(_mu); _map.try_emplace(std::move(id), std::move(fs)); } -FileSystemPtr FileSystemMap::get(const ResourceId& id) { +FileSystemSPtr FileSystemMap::get(const ResourceId& id) { std::shared_lock rlock(_mu); auto it = _map.find(id); if (it != _map.end()) { diff --git a/be/src/io/fs/file_system_map.h b/be/src/io/fs/file_system_map.h index a7a4ef57fc..c208db59aa 100644 --- a/be/src/io/fs/file_system_map.h +++ b/be/src/io/fs/file_system_map.h @@ -31,17 +31,17 @@ public: static FileSystemMap* instance(); ~FileSystemMap() = default; - void insert(ResourceId id, FileSystemPtr fs); + void insert(ResourceId id, FileSystemSPtr fs); // If `id` is not in `_map`, return nullptr. - FileSystemPtr get(const ResourceId& id); + FileSystemSPtr get(const ResourceId& id); private: FileSystemMap() = default; private: std::shared_mutex _mu; - std::unordered_map<ResourceId, FileSystemPtr> _map; // GUARED_BY(_mu) + std::unordered_map<ResourceId, FileSystemSPtr> _map; // GUARED_BY(_mu) }; } // namespace io diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp index 46098e060f..d9e43c21a8 100644 --- a/be/src/io/fs/local_file_system.cpp +++ b/be/src/io/fs/local_file_system.cpp @@ -142,9 +142,10 @@ Status LocalFileSystem::list(const Path& path, std::vector<Path>* files) { return Status::OK(); } -LocalFileSystem* global_local_filesystem() { - static LocalFileSystem fs(""); - return &fs; +static FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>(""); + +FileSystemSPtr global_local_filesystem() { + return local_fs; } } // namespace io diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 2363edc77f..1477d0aa99 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -50,7 +50,7 @@ private: Path absolute_path(const Path& path) const; }; -LocalFileSystem* global_local_filesystem(); +FileSystemSPtr global_local_filesystem(); } // namespace io } // namespace doris diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 00a4eed292..68feb418e3 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -19,6 +19,7 @@ #include <aws/core/utils/threading/Executor.h> #include <aws/s3/S3Client.h> +#include <aws/s3/model/CopyObjectRequest.h> #include <aws/s3/model/DeleteObjectRequest.h> #include <aws/s3/model/DeleteObjectsRequest.h> #include <aws/s3/model/HeadObjectRequest.h> @@ -35,6 +36,7 @@ #include "gutil/strings/stringpiece.h" #include "io/fs/remote_file_system.h" #include "io/fs/s3_file_reader.h" +#include "io/fs/s3_file_writer.h" namespace doris { namespace io { @@ -136,7 +138,8 @@ Status S3FileSystem::batch_upload(const std::vector<Path>& local_paths, } Status S3FileSystem::create_file(const Path& path, FileWriterPtr* writer) { - return Status::NotSupported("not support"); + *writer = std::make_unique<S3FileWriter>(Path(get_key(path)), get_client(), _s3_conf); + return Status::OK(); } Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader) { diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h index 9eb393996f..46510d3aa0 100644 --- a/be/src/io/fs/s3_file_system.h +++ b/be/src/io/fs/s3_file_system.h @@ -75,7 +75,6 @@ public: // Guarded by external lock. void set_sk(std::string sk) { _s3_conf.sk = std::move(sk); } -private: std::string get_key(const Path& path) const; private: diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp new file mode 100644 index 0000000000..0a5848726a --- /dev/null +++ b/be/src/io/fs/s3_file_writer.cpp @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/s3_file_writer.h" + +#include <aws/core/Aws.h> +#include <aws/core/utils/HashingUtils.h> +#include <aws/s3/S3Client.h> +#include <aws/s3/model/AbortMultipartUploadRequest.h> +#include <aws/s3/model/CompleteMultipartUploadRequest.h> +#include <aws/s3/model/CreateMultipartUploadRequest.h> +#include <aws/s3/model/DeleteObjectRequest.h> +#include <aws/s3/model/DeleteObjectsRequest.h> +#include <aws/s3/model/GetObjectRequest.h> +#include <aws/s3/model/UploadPartRequest.h> +#include <fmt/core.h> +#include <sys/uio.h> + +#include <cerrno> + +#include "common/compiler_util.h" +#include "common/status.h" +#include "gutil/macros.h" +#include "io/fs/file_writer.h" +#include "io/fs/path.h" +#include "io/fs/s3_file_system.h" +#include "util/doris_metrics.h" + +using Aws::S3::Model::AbortMultipartUploadRequest; +using Aws::S3::Model::CompletedPart; +using Aws::S3::Model::CompletedMultipartUpload; +using Aws::S3::Model::CompleteMultipartUploadRequest; +using Aws::S3::Model::CreateMultipartUploadRequest; +using Aws::S3::Model::DeleteObjectRequest; +using Aws::S3::Model::UploadPartRequest; +using Aws::S3::Model::UploadPartOutcome; + +namespace doris { +namespace io { + +// max size of each part when uploading: 5MB +static const int MAX_SIZE_EACH_PART = 5 * 1024 * 1024; +static const char* STREAM_TAG = "S3FileWriter"; + +S3FileWriter::S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, + const S3Conf& s3_conf) + : FileWriter(std::move(path)), _client(client), _s3_conf(s3_conf) { + DorisMetrics::instance()->s3_file_open_writing->increment(1); + DorisMetrics::instance()->s3_file_writer_total->increment(1); +} + +S3FileWriter::~S3FileWriter() { + if (!_closed) { + WARN_IF_ERROR(abort(), fmt::format("Cannot abort {}", _path.native())); + } +} + +Status S3FileWriter::close() { + return _close(); +} + +Status S3FileWriter::abort() { + AbortMultipartUploadRequest request; + request.WithBucket(_s3_conf.bucket).WithKey(_path.native()).WithUploadId(_upload_id); + auto outcome = _client->AbortMultipartUpload(request); + if (outcome.IsSuccess() || + outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD || + outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + LOG(INFO) << "Abort multipart upload successfully. endpoint=" << _s3_conf.endpoint + << ", bucket=" << _s3_conf.bucket << ", key=" << _path.native() + << ", upload_id=" << _upload_id; + return Status::OK(); + } + return Status::IOError( + "failed to abort multipart upload(endpoint={}, bucket={}, key={}, upload_id={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, _path.native(), _upload_id, + outcome.GetError().GetMessage()); +} + +Status S3FileWriter::_open() { + CreateMultipartUploadRequest create_request; + create_request.WithBucket(_s3_conf.bucket).WithKey(_path.native()); + create_request.SetContentType("text/plain"); + + _reset_stream(); + auto outcome = _client->CreateMultipartUpload(create_request); + + if (outcome.IsSuccess()) { + _upload_id = outcome.GetResult().GetUploadId(); + LOG(INFO) << "create multi part upload successfully (endpoint=" << _s3_conf.endpoint + << ", bucket=" << _s3_conf.bucket << ", key=" << _path.native() + << ") upload_id: " << _upload_id; + return Status::OK(); + } + return Status::IOError( + "failed to create multi part upload (endpoint={}, bucket={}, key={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, _path.native(), outcome.GetError().GetMessage()); +} + +Status S3FileWriter::append(const Slice& data) { + Status st = appendv(&data, 1); + if (st.ok()) { + DorisMetrics::instance()->s3_bytes_written_total->increment(data.size); + } + return st; +} + +Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) { + DCHECK(!_closed); + if (!_is_open) { + RETURN_IF_ERROR(_open()); + _is_open = true; + } + + for (size_t i = 0; i < data_cnt; i++) { + const Slice& result = data[i]; + _stream_ptr->write(result.data, result.size); + _bytes_appended += result.size; + auto start_pos = _stream_ptr->tellg(); + _stream_ptr->seekg(0LL, _stream_ptr->end); + _stream_ptr->seekg(start_pos); + } + if (_stream_ptr->str().size() >= MAX_SIZE_EACH_PART) { + RETURN_IF_ERROR(_upload_part()); + } + return Status::OK(); +} + +Status S3FileWriter::_upload_part() { + if (_stream_ptr->str().size() == 0) { + return Status::OK(); + } + ++_cur_part_num; + + UploadPartRequest upload_request; + upload_request.WithBucket(_s3_conf.bucket) + .WithKey(_path.native()) + .WithPartNumber(_cur_part_num) + .WithUploadId(_upload_id); + + upload_request.SetBody(_stream_ptr); + + Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*_stream_ptr)); + upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5)); + + auto start_pos = _stream_ptr->tellg(); + _stream_ptr->seekg(0LL, _stream_ptr->end); + upload_request.SetContentLength(static_cast<long>(_stream_ptr->tellg())); + _stream_ptr->seekg(start_pos); + + auto upload_part_callable = _client->UploadPartCallable(upload_request); + + UploadPartOutcome upload_part_outcome = upload_part_callable.get(); + _reset_stream(); + if (!upload_part_outcome.IsSuccess()) { + return Status::IOError( + "failed to upload part (endpoint={}, bucket={}, key={}, part_num = {}): {}", + _s3_conf.endpoint, _s3_conf.bucket, _path.native(), _cur_part_num, + upload_part_outcome.GetError().GetMessage()); + } + + std::shared_ptr<CompletedPart> completed_part = std::make_shared<CompletedPart>(); + completed_part->SetPartNumber(_cur_part_num); + auto etag = upload_part_outcome.GetResult().GetETag(); + DCHECK(etag.empty()); + completed_part->SetETag(etag); + _completed_parts.emplace_back(completed_part); + return Status::OK(); +} + +void S3FileWriter::_reset_stream() { + _stream_ptr = Aws::MakeShared<Aws::StringStream>(STREAM_TAG, ""); +} + +Status S3FileWriter::finalize() { + DCHECK(!_closed); + if (_is_open) { + _close(); + } + return Status::OK(); +} + +Status S3FileWriter::_close() { + if (_closed) { + return Status::OK(); + } + if (_is_open) { + RETURN_IF_ERROR(_upload_part()); + + CompleteMultipartUploadRequest complete_request; + complete_request.WithBucket(_s3_conf.bucket) + .WithKey(_path.native()) + .WithUploadId(_upload_id); + + CompletedMultipartUpload completed_upload; + for (std::shared_ptr<CompletedPart> part : _completed_parts) { + completed_upload.AddParts(*part); + } + + complete_request.WithMultipartUpload(completed_upload); + + auto compute_outcome = _client->CompleteMultipartUpload(complete_request); + + if (!compute_outcome.IsSuccess()) { + return Status::IOError( + "failed to create multi part upload (endpoint={}, bucket={}, key={}): {}", + _s3_conf.endpoint, _s3_conf.bucket, _path.native(), + compute_outcome.GetError().GetMessage()); + } + _is_open = false; + } + _closed = true; + + DorisMetrics::instance()->s3_file_open_writing->increment(-1); + DorisMetrics::instance()->s3_file_created_total->increment(1); + DorisMetrics::instance()->s3_bytes_written_total->increment(_bytes_appended); + + LOG(INFO) << "complete multi part upload successfully (endpoint=" << _s3_conf.endpoint + << ", bucket=" << _s3_conf.bucket << ", key=" << _path.native() + << ") upload_id: " << _upload_id; + return Status::OK(); +} + +Status S3FileWriter::write_at(size_t offset, const Slice& data) { + return Status::NotSupported("not support"); +} + +} // namespace io +} // namespace doris diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h new file mode 100644 index 0000000000..d3abc19ba8 --- /dev/null +++ b/be/src/io/fs/s3_file_writer.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <list> + +#include "io/fs/file_writer.h" +#include "util/s3_util.h" + +namespace Aws::S3 { +namespace Model { +class CompletedPart; +} +class S3Client; +} // namespace Aws::S3 + +namespace doris { +namespace io { + +class S3FileWriter final : public FileWriter { +public: + S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf); + ~S3FileWriter() override; + + Status close() override; + + Status abort() override; + + Status append(const Slice& data) override; + + Status appendv(const Slice* data, size_t data_cnt) override; + + Status write_at(size_t offset, const Slice& data) override; + + Status finalize() override; + + size_t bytes_appended() const override { return _bytes_appended; } + +private: + Status _close(); + + Status _open(); + + Status _upload_part(); + + void _reset_stream(); + +private: + std::shared_ptr<Aws::S3::S3Client> _client; + S3Conf _s3_conf; + std::string _upload_id; + bool _is_open = false; + bool _closed = false; + size_t _bytes_appended = 0; + + std::shared_ptr<Aws::StringStream> _stream_ptr; + // Current Part Num for CompletedPart + int _cur_part_num = 0; + std::list<std::shared_ptr<Aws::S3::Model::CompletedPart>> _completed_parts; +}; + +} // namespace io +} // namespace doris diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 4cf40fd1bd..20b23c1405 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -55,7 +55,7 @@ public: const std::string& path() const { return _path; } size_t path_hash() const { return _path_hash; } - const io::FileSystemPtr& fs() const { return _fs; } + const io::FileSystemSPtr& fs() const { return _fs; } bool is_used() const { return _is_used; } void set_is_used(bool is_used) { _is_used = is_used; } @@ -169,7 +169,7 @@ private: std::string _path; size_t _path_hash; - io::FileSystemPtr _fs; + io::FileSystemSPtr _fs; // user specified capacity int64_t _capacity_bytes; // the actual available capacity of the disk of this data dir diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 5f1985bd8f..0d99676ecc 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -38,15 +38,12 @@ namespace doris { using io::FileCacheManager; std::string BetaRowset::segment_file_path(int segment_id) { - if (is_local()) { - return local_segment_path(_tablet_path, rowset_id(), segment_id); - } #ifdef BE_TEST if (!config::file_cache_type.empty()) { - return local_segment_path(_tablet_path, rowset_id(), segment_id); + return segment_file_path(_tablet_path, rowset_id(), segment_id); } #endif - return remote_segment_path(_rowset_meta->tablet_id(), rowset_id(), segment_id); + return segment_file_path(_rowset_dir, rowset_id(), segment_id); } std::string BetaRowset::segment_cache_path(int segment_id) { @@ -54,34 +51,38 @@ std::string BetaRowset::segment_cache_path(int segment_id) { return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id); } -std::string BetaRowset::local_segment_path(const std::string& tablet_path, - const RowsetId& rowset_id, int segment_id) { - // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}.dat - return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id.to_string(), segment_id); +std::string BetaRowset::segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, + int segment_id) { + // {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat + return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(), segment_id); } -std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string& rowset_id, - int segment_id) { - // data/{tablet_id}/{rowset_id}_{seg_num}.dat - return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, segment_id); +std::string BetaRowset::remote_tablet_path(int64_t tablet_id) { + // data/{tablet_id} + return fmt::format("{}/{}", DATA_PREFIX, tablet_id); } std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id) { // data/{tablet_id}/{rowset_id}_{seg_num}.dat - return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id.to_string(), - segment_id); + return remote_segment_path(tablet_id, rowset_id.to_string(), segment_id); } -std::string BetaRowset::local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id, - int segment_id) { - // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} - return fmt::format("{}/{}_{}", tablet_path, rowset_id.to_string(), segment_id); +std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string& rowset_id, + int segment_id) { + // data/{tablet_id}/{rowset_id}_{seg_num}.dat + return fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, segment_id); } BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path, RowsetMetaSharedPtr rowset_meta) - : Rowset(schema, tablet_path, std::move(rowset_meta)) {} + : Rowset(schema, tablet_path, std::move(rowset_meta)) { + if (_rowset_meta->is_local()) { + _rowset_dir = tablet_path; + } else { + _rowset_dir = remote_tablet_path(_rowset_meta->tablet_id()); + } +} BetaRowset::~BetaRowset() = default; @@ -188,9 +189,10 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id) return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); } for (int i = 0; i < num_segments(); ++i) { - auto dst_path = local_segment_path(dir, new_rowset_id, i); + auto dst_path = segment_file_path(dir, new_rowset_id, i); // TODO(lingbin): use Env API? or EnvUtil? - if (FileUtils::check_exist(dst_path)) { + bool dst_path_exist = false; + if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { LOG(WARNING) << "failed to create hard link, file already exist: " << dst_path; return Status::OLAPInternalError(OLAP_ERR_FILE_ALREADY_EXIST); } @@ -209,7 +211,7 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id) Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) { DCHECK(is_local()); for (int i = 0; i < num_segments(); ++i) { - auto dst_path = local_segment_path(dir, new_rowset_id, i); + auto dst_path = segment_file_path(dir, new_rowset_id, i); Status status = Env::Default()->path_exists(dst_path); if (status.ok()) { LOG(WARNING) << "file already exist: " << dst_path; @@ -266,7 +268,12 @@ bool BetaRowset::check_path(const std::string& path) { bool BetaRowset::check_file_exist() { for (int i = 0; i < num_segments(); ++i) { auto seg_path = segment_file_path(i); - if (!Env::Default()->path_exists(seg_path).ok()) { + auto fs = _rowset_meta->fs(); + if (!fs) { + return false; + } + bool seg_file_exist = false; + if (!fs->exists(seg_path, &seg_file_exist).ok() || !seg_file_exist) { LOG(WARNING) << "data file not existed: " << seg_path << " for rowset_id: " << rowset_id(); return false; diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index dde891d6cb..1d4153f701 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -46,8 +46,8 @@ public: std::string segment_cache_path(int segment_id); - static std::string local_segment_path(const std::string& tablet_path, const RowsetId& rowset_id, - int segment_id); + static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id, + int segment_id); static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id, int segment_id); @@ -55,8 +55,7 @@ public: static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id, int segment_id); - static std::string local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id, - int segment_id); + static std::string remote_tablet_path(int64_t tablet_id); Status split_range(const RowCursor& start_key, const RowCursor& end_key, uint64_t request_block_row_count, size_t key_num, diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 13532ee269..288d8b91f7 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -53,8 +53,8 @@ BetaRowsetWriter::~BetaRowsetWriter() { return; } for (int i = 0; i < _num_segment; ++i) { - auto seg_path = - BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i); + std::string seg_path = + BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i); // Even if an error is encountered, these files that have not been cleaned up // will be cleaned up by the GC background. So here we only print the error // message when we encounter an error. @@ -67,8 +67,13 @@ BetaRowsetWriter::~BetaRowsetWriter() { Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { _context = rowset_writer_context; _rowset_meta.reset(new RowsetMeta); - if (_context.data_dir) { + if (_context.fs == nullptr && _context.data_dir) { _rowset_meta->set_fs(_context.data_dir->fs()); + } else { + _rowset_meta->set_fs(_context.fs); + } + if (_context.fs != nullptr && _context.fs->resource_id().size() > 0) { + _rowset_meta->set_resource_id(_context.fs->resource_id()); } _rowset_meta->set_rowset_id(_context.rowset_id); _rowset_meta->set_partition_id(_context.partition_id); @@ -156,7 +161,7 @@ template Status BetaRowsetWriter::_add_row(const ContiguousRow& row); Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); - RETURN_NOT_OK(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); + RETURN_NOT_OK(rowset->link_files_to(_context.rowset_dir, _context.rowset_id)); _num_rows_written += rowset->num_rows(); _total_data_size += rowset->rowset_meta()->data_disk_size(); _total_index_size += rowset->rowset_meta()->index_disk_size(); @@ -250,7 +255,7 @@ RowsetSharedPtr BetaRowsetWriter::build() { } RowsetSharedPtr rowset; - auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, + auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta, &rowset); if (!status.ok()) { LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; @@ -286,7 +291,7 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() { _build_rowset_meta(rowset_meta_); RowsetSharedPtr rowset; - auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, + auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, rowset_meta_, &rowset); if (!status.ok()) { LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; @@ -298,8 +303,7 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() { Status BetaRowsetWriter::_create_segment_writer( std::unique_ptr<segment_v2::SegmentWriter>* writer) { int32_t segment_id = _num_segment.fetch_add(1); - auto path = - BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, segment_id); + auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id); auto fs = _rowset_meta->fs(); if (!fs) { return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index bda106d462..32fda0e102 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -229,6 +229,8 @@ public: const std::string& tablet_path() const { return _tablet_path; } + virtual std::string rowset_dir() { return _rowset_dir; } + static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) { return left->end_version() < right->end_version(); } @@ -291,6 +293,7 @@ protected: TabletSchemaSPtr _schema; std::string _tablet_path; + std::string _rowset_dir; RowsetMetaSharedPtr _rowset_meta; // init in constructor bool _is_pending; // rowset is pending iff it's not in visible state diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 69b4f65911..7154116e7f 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -82,7 +82,7 @@ public: } // This method may return nullptr. - io::FileSystem* fs() { + io::FileSystemSPtr fs() { if (!_fs) { if (is_local()) { return io::global_local_filesystem(); @@ -91,10 +91,10 @@ public: LOG_IF(WARNING, !_fs) << "Cannot get file system: " << resource_id(); } } - return _fs.get(); + return _fs; } - void set_fs(io::FileSystemPtr fs) { _fs = std::move(fs); } + void set_fs(io::FileSystemSPtr fs) { _fs = std::move(fs); } const io::ResourceId& resource_id() const { return _rowset_meta_pb.resource_id(); } @@ -396,7 +396,7 @@ private: RowsetMetaPB _rowset_meta_pb; TabletSchemaSPtr _schema = nullptr; RowsetId _rowset_id; - io::FileSystemPtr _fs; + io::FileSystemSPtr _fs; bool _is_removed_from_rowset_meta = false; }; diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index b01e6fca46..8fef7bb16a 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -34,6 +34,7 @@ struct RowsetWriterContext { tablet_schema_hash(0), partition_id(0), rowset_type(ALPHA_ROWSET), + fs(nullptr), tablet_schema(nullptr), rowset_state(PREPARED), version(Version(0, 0)), @@ -54,7 +55,7 @@ struct RowsetWriterContext { context.partition_id = new_tablet->partition_id(); context.tablet_schema_hash = new_tablet->schema_hash(); context.rowset_type = new_rowset_type; - context.tablet_path = new_tablet->tablet_path(); + context.rowset_dir = new_tablet->tablet_path(); context.tablet_schema = new_tablet->tablet_schema(); context.data_dir = new_tablet->data_dir(); context.rowset_state = VISIBLE; @@ -69,7 +70,8 @@ struct RowsetWriterContext { int64_t tablet_schema_hash; int64_t partition_id; RowsetTypePB rowset_type; - std::string tablet_path; + io::FileSystemSPtr fs = nullptr; + std::string rowset_dir = ""; TabletSchemaSPtr tablet_schema; // PREPARED/COMMITTED for pending rowset // VISIBLE for non-pending rowset diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index cc59a15ae1..056665d295 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -42,7 +42,7 @@ namespace segment_v2 { using io::FileCacheManager; -Status Segment::open(io::FileSystem* fs, const std::string& path, const std::string& cache_path, +Status Segment::open(io::FileSystemSPtr fs, const std::string& path, const std::string& cache_path, uint32_t segment_id, TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* output) { std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema)); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index fcdf965fd5..a3dcd8c6c6 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -61,9 +61,9 @@ using SegmentSharedPtr = std::shared_ptr<Segment>; // change finished, client should disable all cached Segment for old TabletSchema. class Segment : public std::enable_shared_from_this<Segment> { public: - static Status open(io::FileSystem* fs, const std::string& path, const std::string& cache_path, - uint32_t segment_id, TabletSchemaSPtr tablet_schema, - std::shared_ptr<Segment>* output); + static Status open(io::FileSystemSPtr fs, const std::string& path, + const std::string& cache_path, uint32_t segment_id, + TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* output); ~Segment(); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 45a8e4268b..c4caf90567 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -2186,7 +2186,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams rs_reader->version(), VISIBLE, rs_reader->rowset()->rowset_meta()->segments_overlap(), new_tablet->tablet_schema(), rs_reader->oldest_write_timestamp(), rs_reader->newest_write_timestamp(), - &rowset_writer); + rs_reader->rowset()->rowset_meta()->fs(), &rowset_writer); if (!status.ok()) { res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT); return process_alter_exit(); @@ -2374,6 +2374,18 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params, *sc_directly = true; } + // if rs_reader has remote files, link schema change is not supported, + // use directly schema change instead. + if (!(*sc_directly) && !(*sc_sorting)) { + // check has remote rowset + for (auto& rs_reader : sc_params.ref_rowset_readers) { + if (!rs_reader->rowset()->is_local()) { + *sc_directly = true; + break; + } + } + } + return Status::OK(); } diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index a6f2fa0764..904ef62baf 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -255,7 +255,7 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, context.partition_id = org_rowset_meta->partition_id(); context.tablet_schema_hash = org_rowset_meta->tablet_schema_hash(); context.rowset_type = org_rowset_meta->rowset_type(); - context.tablet_path = new_tablet_path; + context.rowset_dir = new_tablet_path; context.tablet_schema = org_rowset_meta->tablet_schema() ? org_rowset_meta->tablet_schema() : tablet_schema; context.rowset_state = org_rowset_meta->rowset_state(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4524f1e89f..58f6f226d8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1661,6 +1661,16 @@ Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& TabletSchemaSPtr tablet_schema, int64_t oldest_write_timestamp, int64_t newest_write_timestamp, std::unique_ptr<RowsetWriter>* rowset_writer) { + return create_rowset_writer(version, rowset_state, overlap, tablet_schema, + oldest_write_timestamp, newest_write_timestamp, nullptr, + rowset_writer); +} + +Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state, + const SegmentsOverlapPB& overlap, + TabletSchemaSPtr tablet_schema, int64_t oldest_write_timestamp, + int64_t newest_write_timestamp, io::FileSystemSPtr fs, + std::unique_ptr<RowsetWriter>* rowset_writer) { RowsetWriterContext context; context.version = version; context.rowset_state = rowset_state; @@ -1669,6 +1679,7 @@ Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& context.newest_write_timestamp = newest_write_timestamp; context.tablet_schema = tablet_schema; context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write(); + context.fs = fs; _init_context_common_fields(context); return RowsetFactory::create_rowset_writer(context, rowset_writer); } @@ -1704,7 +1715,11 @@ void Tablet::_init_context_common_fields(RowsetWriterContext& context) { if (context.rowset_type == ALPHA_ROWSET) { context.rowset_type = StorageEngine::instance()->default_rowset_type(); } - context.tablet_path = tablet_path(); + if (context.fs != nullptr && context.fs->type() != io::FileSystemType::LOCAL) { + context.rowset_dir = BetaRowset::remote_tablet_path(tablet_id()); + } else { + context.rowset_dir = tablet_path(); + } context.data_dir = data_dir(); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 938c1f38a2..318fdc8748 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -301,6 +301,12 @@ public: int64_t oldest_write_timestamp, int64_t newest_write_timestamp, std::unique_ptr<RowsetWriter>* rowset_writer); + Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state, + const SegmentsOverlapPB& overlap, TabletSchemaSPtr tablet_schema, + int64_t oldest_write_timestamp, int64_t newest_write_timestamp, + io::FileSystemSPtr fs, + std::unique_ptr<RowsetWriter>* rowset_writer); + Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id, const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap, TabletSchemaSPtr tablet_schema, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 6a3555575c..70348ba110 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -397,7 +397,7 @@ Status TabletMeta::deserialize(const string& meta_binary) { return Status::OK(); } -void TabletMeta::init_rs_metas_fs(const io::FileSystemPtr& fs) { +void TabletMeta::init_rs_metas_fs(const io::FileSystemSPtr& fs) { for (auto& rs_meta : _rs_metas) { if (rs_meta->is_local()) { rs_meta->set_fs(fs); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index d61a20dfa3..3655b71602 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -111,7 +111,7 @@ public: Status deserialize(const std::string& meta_binary); void init_from_pb(const TabletMetaPB& tablet_meta_pb); // Init `RowsetMeta._fs` if rowset is local. - void init_rs_metas_fs(const io::FileSystemPtr& fs); + void init_rs_metas_fs(const io::FileSystemSPtr& fs); void to_meta_pb(TabletMetaPB* tablet_meta_pb); void to_json(std::string* json_string, json2pb::Pb2JsonOptions& options); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index f8a27e4f6a..15be14b06e 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -168,14 +168,18 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_created_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_bytes_read_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_read_total, MetricUnit::FILESYSTEM); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_bytes_written_total, MetricUnit::FILESYSTEM); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM); const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; @@ -289,13 +293,17 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_created_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_bytes_read_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_bytes_read_total); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_bytes_written_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_bytes_written_total); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing); } void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index da9085671e..fe0689f23c 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -155,13 +155,17 @@ public: IntCounter* local_file_reader_total; IntCounter* s3_file_reader_total; IntCounter* local_file_writer_total; + IntCounter* s3_file_writer_total; IntCounter* file_created_total; + IntCounter* s3_file_created_total; IntCounter* local_bytes_read_total; IntCounter* s3_bytes_read_total; IntCounter* local_bytes_written_total; + IntCounter* s3_bytes_written_total; IntGauge* local_file_open_reading; IntGauge* s3_file_open_reading; IntGauge* local_file_open_writing; + IntGauge* s3_file_open_writing; // Size of some global containers UIntGauge* rowset_count_generated_and_in_use; diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index d105223f55..27b43fec3c 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -131,7 +131,7 @@ protected: rowset_writer_context->data_dir = _data_dir.get(); rowset_writer_context->rowset_state = VISIBLE; rowset_writer_context->tablet_schema = tablet_schema; - rowset_writer_context->tablet_path = "tablet_path"; + rowset_writer_context->rowset_dir = "tablet_path"; rowset_writer_context->version = Version(inc_id, inc_id); rowset_writer_context->segments_overlap = overlap; rowset_writer_context->max_rows_per_segment = max_rows_per_segment; diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 5dd53cd490..2df689fc04 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -147,7 +147,7 @@ protected: rowset_writer_context->tablet_schema_hash = 1111; rowset_writer_context->partition_id = 10; rowset_writer_context->rowset_type = BETA_ROWSET; - rowset_writer_context->tablet_path = kTestDir; + rowset_writer_context->rowset_dir = kTestDir; rowset_writer_context->rowset_state = VISIBLE; rowset_writer_context->tablet_schema = tablet_schema; rowset_writer_context->version.first = 10; diff --git a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp index 3d2d23abed..fc1e9cd62f 100644 --- a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp +++ b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp @@ -35,6 +35,9 @@ #include "util/file_utils.h" namespace doris { + +using FileSystemSPtr = std::shared_ptr<io::FileSystem>; + namespace segment_v2 { using roaring::Roaring; @@ -56,7 +59,7 @@ public: }; template <FieldType type> -void write_index_file(const std::string& filename, io::FileSystem* fs, const void* values, +void write_index_file(const std::string& filename, FileSystemSPtr fs, const void* values, size_t value_count, size_t null_count, ColumnIndexMetaPB* meta) { const auto* type_info = get_scalar_type_info<type>(); { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org