This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0134e9d2f4 [Improvement](runtime filter) Reduce merging time for bloom filter (#13668) 0134e9d2f4 is described below commit 0134e9d2f4010b2f540d53b593f34733160bb19d Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Oct 27 00:02:05 2022 +0800 [Improvement](runtime filter) Reduce merging time for bloom filter (#13668) --- be/src/exprs/block_bloom_filter.hpp | 17 +++++++++++------ be/src/exprs/block_bloom_filter_avx_impl.cc | 1 + be/src/exprs/block_bloom_filter_impl.cc | 27 +++++++++++++++++++-------- be/src/exprs/bloomfilter_predicate.h | 16 ++++++++++------ be/src/exprs/runtime_filter.cpp | 2 +- be/src/exprs/runtime_filter.h | 14 ++++++++++++-- be/src/runtime/fragment_mgr.cpp | 8 +++++--- be/src/runtime/fragment_mgr.h | 10 ++++++++-- be/src/runtime/runtime_filter_mgr.cpp | 15 +++++---------- be/src/runtime/runtime_filter_mgr.h | 10 ++++++++-- be/src/service/internal_service.cpp | 11 +++++++---- 11 files changed, 87 insertions(+), 44 deletions(-) diff --git a/be/src/exprs/block_bloom_filter.hpp b/be/src/exprs/block_bloom_filter.hpp index b75350b56e..2dd6e3cb4f 100644 --- a/be/src/exprs/block_bloom_filter.hpp +++ b/be/src/exprs/block_bloom_filter.hpp @@ -21,10 +21,14 @@ #pragma once #include "common/status.h" -#include "gutil/macros.h" +#include "fmt/format.h" #include "util/hash_util.hpp" #include "util/slice.h" +namespace butil { +class IOBufAsZeroCopyInputStream; +} + namespace doris { // https://github.com/apache/kudu/blob/master/src/kudu/util/block_bloom_filter.h @@ -40,11 +44,14 @@ public: explicit BlockBloomFilter(); ~BlockBloomFilter(); + BlockBloomFilter(const BlockBloomFilter&) = delete; + BlockBloomFilter& operator=(const BlockBloomFilter&) = delete; + Status init(int log_space_bytes, uint32_t hash_seed); // Initialize the BlockBloomFilter from a populated "directory" structure. // Useful for initializing the BlockBloomFilter by de-serializing a custom protobuf message. - Status init_from_directory(int log_space_bytes, const Slice& directory, bool always_false, - uint32_t hash_seed); + Status init_from_directory(int log_space_bytes, butil::IOBufAsZeroCopyInputStream* data, + const size_t data_size, bool always_false, uint32_t hash_seed); void close(); @@ -176,7 +183,7 @@ private: #endif // Size of the internal directory structure in bytes. - int64_t directory_size() const { return 1ULL << log_space_bytes(); } + size_t directory_size() const { return 1ULL << log_space_bytes(); } // Some constants used in hashing. #defined for efficiency reasons. #define BLOOM_HASH_CONSTANTS \ @@ -200,8 +207,6 @@ private: // Rehash32to32(hash2) is minimal. return (static_cast<uint64_t>(hash) * m + a) >> 32U; } - - DISALLOW_COPY_AND_ASSIGN(BlockBloomFilter); }; } // namespace doris diff --git a/be/src/exprs/block_bloom_filter_avx_impl.cc b/be/src/exprs/block_bloom_filter_avx_impl.cc index e005b4e9c6..b6512f9848 100644 --- a/be/src/exprs/block_bloom_filter_avx_impl.cc +++ b/be/src/exprs/block_bloom_filter_avx_impl.cc @@ -23,6 +23,7 @@ #include <immintrin.h> #include "exprs/block_bloom_filter.hpp" +#include "gutil/macros.h" namespace doris { static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i make_mark( diff --git a/be/src/exprs/block_bloom_filter_impl.cc b/be/src/exprs/block_bloom_filter_impl.cc index e553694c5a..b619b239a4 100644 --- a/be/src/exprs/block_bloom_filter_impl.cc +++ b/be/src/exprs/block_bloom_filter_impl.cc @@ -26,6 +26,8 @@ #include <mm_malloc.h> #endif +#include <butil/iobuf.h> + #include <algorithm> #include <climits> #include <cmath> @@ -87,17 +89,26 @@ Status BlockBloomFilter::init(const int log_space_bytes, uint32_t hash_seed) { return Status::OK(); } -Status BlockBloomFilter::init_from_directory(int log_space_bytes, const Slice& directory, - bool always_false, uint32_t hash_seed) { +Status BlockBloomFilter::init_from_directory(int log_space_bytes, + butil::IOBufAsZeroCopyInputStream* data, + const size_t data_size, bool always_false, + uint32_t hash_seed) { RETURN_IF_ERROR(init_internal(log_space_bytes, hash_seed)); DCHECK(_directory); - if (directory_size() != directory.size) { - return Status::InvalidArgument( + if (directory_size() != data_size) { + return Status::InvalidArgument(fmt::format( "Mismatch in BlockBloomFilter source directory size {} and expected size {}", - directory.size, directory_size()); + data_size, directory_size())); + } + int size = 0; + char* tmp; + const void** ptr = (const void**)&tmp; + char* data_ptr = reinterpret_cast<char*>(_directory); + while (data->Next(ptr, &size)) { + memcpy(data_ptr, *ptr, size); + data_ptr += size; } - memcpy(_directory, directory.data, directory.size); _always_false = always_false; return Status::OK(); } @@ -240,8 +251,8 @@ Status BlockBloomFilter::merge(const BlockBloomFilter& other) { return Status::OK(); } - or_equal_array_internal(directory_size(), reinterpret_cast<const uint8*>(other._directory), - reinterpret_cast<uint8*>(_directory)); + or_equal_array_internal(directory_size(), reinterpret_cast<const uint8_t*>(other._directory), + reinterpret_cast<uint8_t*>(_directory)); _always_false = false; return Status::OK(); diff --git a/be/src/exprs/bloomfilter_predicate.h b/be/src/exprs/bloomfilter_predicate.h index 5ce3ce47d5..545c40cc5b 100644 --- a/be/src/exprs/bloomfilter_predicate.h +++ b/be/src/exprs/bloomfilter_predicate.h @@ -33,6 +33,10 @@ #include "olap/uint24.h" #include "util/hash_util.hpp" +namespace butil { +class IOBufAsZeroCopyInputStream; +} + namespace doris { class BloomFilterAdaptor { public: @@ -50,9 +54,9 @@ public: return _bloom_filter->init(log_space, /*hash_seed*/ 0); } - Status init(const char* data, int len) { - int log_space = log2(len); - return _bloom_filter->init_from_directory(log_space, Slice(data, len), false, 0); + Status init(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) { + int log_space = log2(data_size); + return _bloom_filter->init_from_directory(log_space, data, data_size, false, 0); } char* data() { return (char*)_bloom_filter->directory().data; } @@ -161,13 +165,13 @@ public: } } - Status assign(const char* data, int len) { + Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) { if (_bloom_filter == nullptr) { _bloom_filter.reset(BloomFilterAdaptor::create()); } - _bloom_filter_alloced = len; - return _bloom_filter->init(data, len); + _bloom_filter_alloced = data_size; + return _bloom_filter->init(data, data_size); } Status get_data(char** data, int* len) { diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1b5ca935e5..77a476d095 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -862,7 +862,7 @@ public: // used by shuffle runtime filter // assign this filter by protobuf - Status assign(const PBloomFilter* bloom_filter, const char* data) { + Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data) { _is_bloomfilter = true; // we won't use this class to insert or find any data // so any type is ok diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 677321ea4d..42584d64f9 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -27,6 +27,10 @@ #include "util/time.h" #include "util/uid_util.h" +namespace butil { +class IOBufAsZeroCopyInputStream; +} + namespace doris { class Predicate; class ObjectPool; @@ -97,14 +101,20 @@ struct RuntimeFilterParams { }; struct UpdateRuntimeFilterParams { + UpdateRuntimeFilterParams(const PPublishFilterRequest* req, + butil::IOBufAsZeroCopyInputStream* data_stream, ObjectPool* obj_pool) + : request(req), data(data_stream), pool(obj_pool) {} const PPublishFilterRequest* request; - const char* data; + butil::IOBufAsZeroCopyInputStream* data; ObjectPool* pool; }; struct MergeRuntimeFilterParams { + MergeRuntimeFilterParams(const PMergeFilterRequest* req, + butil::IOBufAsZeroCopyInputStream* data_stream) + : request(req), data(data_stream) {} const PMergeFilterRequest* request; - const char* data; + butil::IOBufAsZeroCopyInputStream* data; }; /// The runtimefilter is built in the join node. diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7b5c3af432..511979d2a8 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -921,7 +921,8 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, return exec_plan_fragment(exec_fragment_params); } -Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const char* data) { +Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* attach_data) { UniqueId fragment_instance_id = request->fragment_id(); TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); std::shared_ptr<FragmentExecState> fragment_state; @@ -946,11 +947,12 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, const cha RuntimeFilterMgr* runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr(); - Status st = runtime_filter_mgr->update_filter(request, data); + Status st = runtime_filter_mgr->update_filter(request, attach_data); return st; } -Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, const char* attach_data) { +Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* attach_data) { UniqueId queryid = request->query_id(); std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index fd54afacd2..08be2edc0c 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -36,6 +36,10 @@ #include "util/metrics.h" #include "util/thread.h" +namespace butil { +class IOBufAsZeroCopyInputStream; +} + namespace doris { class QueryFragmentsCtx; @@ -85,9 +89,11 @@ public: const TUniqueId& fragment_instance_id, std::vector<TScanColumnDesc>* selected_columns); - Status apply_filter(const PPublishFilterRequest* request, const char* attach_data); + Status apply_filter(const PPublishFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* attach_data); - Status merge_filter(const PMergeFilterRequest* request, const char* attach_data); + Status merge_filter(const PMergeFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* attach_data); void set_pipe(const TUniqueId& fragment_instance_id, std::shared_ptr<StreamLoadPipe> pipe); diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 5175ed580c..0ed812737f 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -110,13 +110,10 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt return Status::OK(); } - -Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) { +Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* data) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); - UpdateRuntimeFilterParams params; - params.request = request; - params.data = data; - params.pool = &_pool; + UpdateRuntimeFilterParams params(request, data, &_pool); int filter_id = request->filter_id(); IRuntimeFilter* real_filter = nullptr; RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter)); @@ -185,7 +182,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag // merge data Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, - const char* data) { + butil::IOBufAsZeroCopyInputStream* attach_data) { // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); std::shared_ptr<RuntimeFilterCntlVal> cntVal; int merged_size = 0; @@ -201,9 +198,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ if (auto bf = cntVal->filter->get_bloomfilter()) { RETURN_IF_ERROR(bf->init_with_fixed_length()); } - MergeRuntimeFilterParams params; - params.data = data; - params.request = request; + MergeRuntimeFilterParams params(request, attach_data); ObjectPool* pool = iter->second->pool.get(); RuntimeFilterWrapperHolder holder; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, pool, holder.getHandle())); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 0db00f2331..a3b3add74b 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -32,6 +32,10 @@ #include "util/time.h" #include "util/uid_util.h" +namespace butil { +class IOBufAsZeroCopyInputStream; +} + namespace doris { class TUniqueId; class RuntimeFilter; @@ -69,7 +73,8 @@ public: const TQueryOptions& options, int node_id = -1); // update filter by remote - Status update_filter(const PPublishFilterRequest* request, const char* data); + Status update_filter(const PPublishFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* data); void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); @@ -113,7 +118,8 @@ public: const TQueryOptions& query_options); // handle merge rpc - Status merge(const PMergeFilterRequest* request, const char* data); + Status merge(const PMergeFilterRequest* request, + butil::IOBufAsZeroCopyInputStream* attach_data); UniqueId query_id() { return _query_id; } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0e7c1d1002..9d0c4d0383 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -17,6 +17,8 @@ #include "service/internal_service.h" +#include <butil/iobuf.h> + #include <string> #include "common/config.h" @@ -493,8 +495,9 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { brpc::ClosureGuard closure_guard(done); - auto buf = static_cast<brpc::Controller*>(controller)->request_attachment(); - Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data()); + auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + Status st = _exec_env->fragment_mgr()->merge_filter(request, &zero_copy_input_stream); if (!st.ok()) { LOG(WARNING) << "merge meet error" << st.to_string(); } @@ -507,10 +510,10 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr ::google::protobuf::Closure* done) { brpc::ClosureGuard closure_guard(done); auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); UniqueId unique_id(request->query_id()); - // TODO: avoid copy attachment copy VLOG_NOTICE << "rpc apply_filter recv"; - Status st = _exec_env->fragment_mgr()->apply_filter(request, attachment.to_string().data()); + Status st = _exec_env->fragment_mgr()->apply_filter(request, &zero_copy_input_stream); if (!st.ok()) { LOG(WARNING) << "apply filter meet error: " << st.to_string(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org