(doris) branch branch-2.1 updated: [improvement](segmentcache) limit segment cache by memory or segment … (#37035)
This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 07278e9dcb6 [improvement](segmentcache) limit segment cache by memory or segment … (#37035) 07278e9dcb6 is described below commit 07278e9dcb6df6351e90be7a22599aa1132d5ff1 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Sun Jun 30 20:34:13 2024 +0800 [improvement](segmentcache) limit segment cache by memory or segment … (#37035) …num (#37026) pick ##37026 --- be/src/common/config.cpp | 2 +- be/src/olap/lru_cache.cpp | 5 + be/src/olap/lru_cache.h| 4 +++- be/src/olap/segment_loader.h | 11 +++ be/src/runtime/exec_env_init.cpp | 8 be/test/testutil/run_all_tests.cpp | 2 +- 6 files changed, 21 insertions(+), 11 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 493ad699aac..7a8c63db748 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1052,7 +1052,7 @@ DEFINE_mInt32(schema_cache_sweep_time_sec, "100"); // max number of segment cache, default -1 for backward compatibility fd_number*2/5 DEFINE_mInt32(segment_cache_capacity, "-1"); -DEFINE_mInt32(estimated_num_columns_per_segment, "30"); +DEFINE_mInt32(estimated_num_columns_per_segment, "200"); DEFINE_mInt32(estimated_mem_per_column_reader, "1024"); // The value is calculate by storage_page_cache_limit * index_page_cache_percentage DEFINE_mInt32(segment_cache_memory_percentage, "2"); diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp index 031082f6da8..741c2423915 100644 --- a/be/src/olap/lru_cache.cpp +++ b/be/src/olap/lru_cache.cpp @@ -22,6 +22,7 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_capacity, MetricUnit::BYTES); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_usage, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_element_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_usage_ratio, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_lookup_count, MetricUnit::OPERATIONS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_hit_count, MetricUnit::OPERATIONS); @@ -542,6 +543,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, _entity->register_hook(name, std::bind(&ShardedLRUCache::update_cache_metrics, this)); INT_GAUGE_METRIC_REGISTER(_entity, cache_capacity); INT_GAUGE_METRIC_REGISTER(_entity, cache_usage); +INT_GAUGE_METRIC_REGISTER(_entity, cache_element_count); INT_DOUBLE_METRIC_REGISTER(_entity, cache_usage_ratio); INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, cache_lookup_count); INT_ATOMIC_COUNTER_METRIC_REGISTER(_entity, cache_hit_count); @@ -640,15 +642,18 @@ void ShardedLRUCache::update_cache_metrics() const { size_t total_usage = 0; size_t total_lookup_count = 0; size_t total_hit_count = 0; +size_t total_element_count = 0; for (int i = 0; i < _num_shards; i++) { total_capacity += _shards[i]->get_capacity(); total_usage += _shards[i]->get_usage(); total_lookup_count += _shards[i]->get_lookup_count(); total_hit_count += _shards[i]->get_hit_count(); +total_element_count += _shards[i]->get_element_count(); } cache_capacity->set_value(total_capacity); cache_usage->set_value(total_usage); +cache_element_count->set_value(total_element_count); cache_lookup_count->set_value(total_lookup_count); cache_hit_count->set_value(total_hit_count); cache_usage_ratio->set_value(total_capacity == 0 ? 0 : ((double)total_usage / total_capacity)); diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h index 50676921044..059020deab5 100644 --- a/be/src/olap/lru_cache.h +++ b/be/src/olap/lru_cache.h @@ -60,7 +60,7 @@ enum LRUCacheType { }; static constexpr LRUCacheType DEFAULT_LRU_CACHE_TYPE = LRUCacheType::SIZE; -static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 16; +static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 32; static constexpr size_t DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY = 0; class CacheKey { @@ -349,6 +349,7 @@ public: uint64_t get_hit_count() const { return _hit_count; } size_t get_usage() const { return _usage; } size_t get_capacity() const { return _capacity; } +size_t get_element_count() const { return _table.element_count(); } private: void _lru_remove(LRUHandle* e); @@ -433,6 +434,7 @@ private: std::shared_ptr _entity; IntGauge* cache_capacity = nullptr; IntGauge* cache_usage = nullptr; +IntGauge* cache_element_count = nullptr; DoubleGauge* cache_usage_ratio = nullptr; IntAtomicCounter* cache_lookup_count = nullptr; IntAtomicCounter* cache_hit_count = nullptr; diff --gi
(doris) branch master updated: [streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033)
This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 201602d3d1f [streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033) 201602d3d1f is described below commit 201602d3d1f50eedb8934d808034cb904c379cfb Author: Gavin Chou AuthorDate: Sun Jun 30 20:37:05 2024 +0800 [streamload](2pc) Fix 2pc stream load txn in cloud mode (#37033) Abort load txn with label only should be forwarded to FE master to handle due to lack of db id. --- be/src/cloud/cloud_meta_mgr.cpp| 6 +- be/src/cloud/cloud_stream_load_executor.cpp| 69 -- be/src/common/config.cpp | 2 +- .../runtime/stream_load/stream_load_executor.cpp | 42 ++--- .../apache/doris/service/FrontendServiceImpl.java | 2 +- .../load_p0/stream_load/test_stream_load.groovy| 1 + 6 files changed, 89 insertions(+), 33 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index f0a377cba67..732f3023e91 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -839,8 +839,12 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { if (ctx.db_id > 0 && !ctx.label.empty()) { req.set_db_id(ctx.db_id); req.set_label(ctx.label); -} else { +} else if (ctx.txn_id > 0) { req.set_txn_id(ctx.txn_id); +} else { +LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id + << " txn_id=" << ctx.txn_id << " label=" << ctx.label; +return Status::InternalError("failed to abort txn"); } return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn); } diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index b7d428e59a4..92fb73eacc1 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -26,6 +26,12 @@ namespace doris { +enum class TxnOpParamType : int { +ILLEGAL, +WITH_TXN_ID, +WITH_LABEL, +}; + CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env) : StreamLoadExecutor(exec_env) {} @@ -42,13 +48,48 @@ Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { } Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { -VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation; +std::stringstream ss; +ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label + << " txn_2pc_op=" << ctx->txn_operation; +std::string op_info = ss.str(); +VLOG_DEBUG << "operate_txn_2pc " << op_info; +TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID + : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL +: TxnOpParamType::ILLEGAL; + +Status st = Status::InternalError("impossible branch reached, " + op_info); + if (ctx->txn_operation.compare("commit") == 0) { -return _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); +if (topt == TxnOpParamType::WITH_TXN_ID) { +VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info; +st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); +} else if (topt == TxnOpParamType::WITH_LABEL) { +VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info; +st = StreamLoadExecutor::operate_txn_2pc(ctx); +} else { +st = Status::InternalError( +"failed to 2pc commit txn, with TxnOpParamType::illegal input, " + op_info); +} +} else if (ctx->txn_operation.compare("abort") == 0) { +if (topt == TxnOpParamType::WITH_TXN_ID) { +LOG(INFO) << "2pc abort stream load txn directly: " << op_info; +st = _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx); +WARN_IF_ERROR(st, "failed to rollback txn " + op_info); +} else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send to FE to abort +VLOG_DEBUG << "2pc abort stream load txn with FE support: " << op_info; +StreamLoadExecutor::rollback_txn(ctx); +st = Status::OK(); +} else { +st = Status::InternalError("failed abort txn, with illegal input, " + op_info); +} } else { -// 2pc abort -return _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx); +std::string msg = +"failed to operate_txn_2pc, unrecognized operation: " + ctx->txn_operation; +LOG(WARNING) << msg << " " << op_info; +st = Status::In
(doris) branch branch-2.1 updated: [fix](clone) Fix clone and alter tablet use same tablet path #34889 (#36858)
This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 92cbbd2b75c [fix](clone) Fix clone and alter tablet use same tablet path #34889 (#36858) 92cbbd2b75c is described below commit 92cbbd2b75ced6fb0756b0bb462ac1daab600b99 Author: deardeng <565620...@qq.com> AuthorDate: Sun Jun 30 20:40:54 2024 +0800 [fix](clone) Fix clone and alter tablet use same tablet path #34889 (#36858) cherry pick from #34889 --- be/src/olap/data_dir.cpp | 25 - be/src/olap/data_dir.h | 4 +- be/src/olap/storage_engine.cpp | 2 + be/src/olap/tablet_manager.cpp | 123 ++--- be/src/olap/tablet_manager.h | 16 ++- be/src/olap/task/engine_clone_task.cpp | 31 -- be/src/olap/task/engine_storage_migration_task.cpp | 8 ++ .../test_drop_clone_tablet_path_race.groovy| 82 ++ 8 files changed, 253 insertions(+), 38 deletions(-) diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 03027184357..37dd76c848c 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -663,7 +663,7 @@ Status DataDir::load() { } // gc unused local tablet dir -void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path) { +void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_id) { if (_stop_bg_worker) { return; } @@ -681,12 +681,11 @@ void DataDir::_perform_tablet_gc(const std::string& tablet_schema_hash_path) { if (!tablet || tablet->data_dir() != this) { if (tablet) { LOG(INFO) << "The tablet in path " << tablet_schema_hash_path - << " is not same with the running one: " << tablet->data_dir()->_path << "/" - << tablet->tablet_path() + << " is not same with the running one: " << tablet->tablet_path() << ", might be the old tablet after migration, try to move it to trash"; } StorageEngine::instance()->tablet_manager()->try_delete_unused_tablet_path( -this, tablet_id, schema_hash, tablet_schema_hash_path); +this, tablet_id, schema_hash, tablet_schema_hash_path, shard_id); return; } @@ -855,7 +854,14 @@ void DataDir::perform_path_gc() { std::this_thread::sleep_for( std::chrono::milliseconds(config::path_gc_check_step_interval_ms)); } -_perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name); +int16_t shard_id = -1; +try { +shard_id = std::stoi(shard.file_name); +} catch (const std::exception&) { +LOG(WARNING) << "failed to stoi shard_id, shard name=" << shard.file_name; +continue; +} +_perform_tablet_gc(tablet_id_path + '/' + schema_hash.file_name, shard_id); } } } @@ -957,8 +963,16 @@ Status DataDir::move_to_trash(const std::string& tablet_path) { } // 5. check parent dir of source file, delete it when empty +RETURN_IF_ERROR(delete_tablet_parent_path_if_empty(tablet_path)); + +return Status::OK(); +} + +Status DataDir::delete_tablet_parent_path_if_empty(const std::string& tablet_path) { +auto fs_tablet_path = io::Path(tablet_path); std::string source_parent_dir = fs_tablet_path.parent_path(); // tablet_id level std::vector sub_files; +bool exists = true; RETURN_IF_ERROR( io::global_local_filesystem()->list(source_parent_dir, false, &sub_files, &exists)); if (sub_files.empty()) { @@ -966,7 +980,6 @@ Status DataDir::move_to_trash(const std::string& tablet_path) { // no need to exam return status RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(source_parent_dir)); } - return Status::OK(); } diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 46abc75934d..38af2f18d52 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -145,6 +145,8 @@ public: // Move tablet to trash. Status move_to_trash(const std::string& tablet_path); +static Status delete_tablet_parent_path_if_empty(const std::string& tablet_path); + private: Status _init_cluster_id(); Status _init_capacity_and_create_shards(); @@ -161,7 +163,7 @@ private: int _path_gc_step {0}; -void _perform_tablet_gc(const std::string& tablet_schema_hash_path); +void _perform_tablet_gc(const std::string& tablet_schema_hash_path, int16_t shard_name); void _perform_rowset_gc(const std::string& t
(doris-website) branch asf-site updated (8b1a63dc0a -> 77ac457f1d)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/doris-website.git discard 8b1a63dc0a Automated deployment with doris branch @ 618ebdb2a0c5f1114aa69300afa656cac9ce1315 new 77ac457f1d Automated deployment with doris branch @ 618ebdb2a0c5f1114aa69300afa656cac9ce1315 This update added new revisions after undoing existing revisions. That is to say, some revisions that were in the old version of the branch are not in the new version. This situation occurs when a user --force pushes a change and generates a repository containing something like this: * -- * -- B -- O -- O -- O (8b1a63dc0a) \ N -- N -- N refs/heads/asf-site (77ac457f1d) You should already have received notification emails for all of the O revisions, and so the following emails describe only the N revisions from the common base, B. Any revisions marked "omit" are not gone; other references still refer to them. Any revisions marked "discard" are gone forever. The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/1.2/search-index.json | 2 +- docs/2.0/search-index.json | 2 +- docs/dev/search-index.json | 2 +- search-index.json| 2 +- zh-CN/docs/1.2/search-index.json | 2 +- zh-CN/docs/2.0/search-index.json | 2 +- zh-CN/docs/dev/search-index.json | 2 +- zh-CN/search-index.json | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated: [Refactor](Recycler) Refactor azure obj client's batch delete function to check the delete response (#37037)
This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new b65f1c3291d [Refactor](Recycler) Refactor azure obj client's batch delete function to check the delete response (#37037) b65f1c3291d is described below commit b65f1c3291d134bf73e7929cab001a4b796aad62 Author: AlexYue AuthorDate: Mon Jul 1 02:54:45 2024 +0800 [Refactor](Recycler) Refactor azure obj client's batch delete function to check the delete response (#37037) This pr use #36590's new code to refactor batch delete for Azure obj client. --- cloud/src/recycler/azure_obj_client.cpp | 42 +++-- cloud/test/mock_accessor.cpp| 2 -- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/cloud/src/recycler/azure_obj_client.cpp b/cloud/src/recycler/azure_obj_client.cpp index 02f906f1cef..60cd79abb95 100644 --- a/cloud/src/recycler/azure_obj_client.cpp +++ b/cloud/src/recycler/azure_obj_client.cpp @@ -34,9 +34,12 @@ #include "common/logging.h" #include "common/sync_point.h" +using namespace Azure::Storage::Blobs; + namespace doris::cloud { static constexpr size_t BlobBatchMaxOperations = 256; +static constexpr char BlobNotFound[] = "BlobNotFound"; template ObjectStorageResponse do_azure_client_call(Func f, std::string_view url, std::string_view key) { @@ -55,8 +58,7 @@ ObjectStorageResponse do_azure_client_call(Func f, std::string_view url, std::st class AzureListIterator final : public ObjectListIterator { public: - AzureListIterator(std::shared_ptr client, - std::string prefix) +AzureListIterator(std::shared_ptr client, std::string prefix) : client_(std::move(client)), req_({.Prefix = std::move(prefix)}) { TEST_SYNC_POINT_CALLBACK("AzureListIterator", &req_); } @@ -116,8 +118,8 @@ public: } private: -std::shared_ptr client_; -Azure::Storage::Blobs::ListBlobsOptions req_; +std::shared_ptr client_; +ListBlobsOptions req_; std::vector results_; bool is_valid_ {true}; bool has_more_ {true}; @@ -181,14 +183,35 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, TEST_SYNC_POINT_CALLBACK("AzureObjClient::delete_objects", &batch_size); std::advance(chunk_end, std::min(batch_size, static_cast(std::distance(begin, end; + std::vector> deferred_resps; +deferred_resps.reserve(std::distance(begin, chunk_end)); for (auto it = begin; it != chunk_end; ++it) { -batch.DeleteBlob(*it); +deferred_resps.emplace_back(batch.DeleteBlob(*it)); } auto resp = do_azure_client_call([&]() { client_->SubmitBatch(batch); }, client_->GetUrl(), *begin); if (resp.ret != 0) { return resp; } +for (auto&& defer : deferred_resps) { +try { +auto r = defer.GetResponse(); +if (!r.Value.Deleted) { +LOG_INFO("Azure batch delete failed, url {}", client_->GetUrl()); +return {-1}; +} +} catch (Azure::Storage::StorageException& e) { +if (Azure::Core::Http::HttpStatusCode::NotFound == e.StatusCode && +0 == strcmp(e.ErrorCode.c_str(), BlobNotFound)) { +continue; +} +auto msg = fmt::format( +"Azure request failed because {}, http code {}, request id {}, url {}", +e.Message, static_cast(e.StatusCode), e.RequestId, client_->GetUrl()); +LOG_WARNING(msg); +return {-1, std::move(msg)}; +} +} begin = chunk_end; } @@ -197,8 +220,13 @@ ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket, } ObjectStorageResponse AzureObjClient::delete_object(ObjectStoragePathRef path) { -return do_azure_client_call([&]() { client_->DeleteBlob(path.key); }, client_->GetUrl(), -path.key); +return do_azure_client_call( +[&]() { +if (auto r = client_->DeleteBlob(path.key); !r.Value.Deleted) { +throw std::runtime_error("Delete azure blob failed"); +} +}, +client_->GetUrl(), path.key); } ObjectStorageResponse AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path, diff --git a/cloud/test/mock_accessor.cpp b/cloud/test/mock_accessor.cpp index 9746e64fd30..77a8afe24b1 100644 --- a/cloud/test/mock_accessor.cpp +++ b/cloud/test/mock_accessor.cpp @@ -18,8 +18,6 @@ #include "mock_accessor.h" -#include -#include #include #include -
(doris) branch master updated: [Exec](agg) Fix agg limit result error (#37025)
This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 2e6fdc0 [Exec](agg) Fix agg limit result error (#37025) 2e6fdc0 is described below commit 2e6fdc0815021579cbc137f43d7bb6fc2ac7 Author: HappenLee AuthorDate: Mon Jul 1 09:49:04 2024 +0800 [Exec](agg) Fix agg limit result error (#37025) Before merge #34853, should merge the pr firstly --- be/src/pipeline/dependency.cpp | 10 ++ be/src/pipeline/dependency.h | 3 ++- be/src/pipeline/exec/aggregation_sink_operator.cpp | 4 +++- be/src/pipeline/exec/aggregation_source_operator.cpp | 8 +++- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 68c00af409d..4938883062a 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -248,7 +248,8 @@ void AggSharedState::build_limit_heap(size_t hash_table_size) { limit_columns_min = limit_heap.top()._row_id; } -bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows) { +bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows, + const std::vector* key_locs) { if (num_rows) { cmp_res.resize(num_rows); need_computes.resize(num_rows); @@ -257,9 +258,10 @@ bool AggSharedState::do_limit_filter(vectorized::Block* block, size_t num_rows) const auto key_size = null_directions.size(); for (int i = 0; i < key_size; i++) { -block->get_by_position(i).column->compare_internal( -limit_columns_min, *limit_columns[i], null_directions[i], order_directions[i], -cmp_res, need_computes.data()); +block->get_by_position(key_locs ? key_locs->operator[](i) : i) +.column->compare_internal(limit_columns_min, *limit_columns[i], + null_directions[i], order_directions[i], cmp_res, + need_computes.data()); } auto set_computes_arr = [](auto* __restrict res, auto* __restrict computes, int rows) { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 5214022db13..8adc24d3b4e 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -311,7 +311,8 @@ public: Status reset_hash_table(); -bool do_limit_filter(vectorized::Block* block, size_t num_rows); +bool do_limit_filter(vectorized::Block* block, size_t num_rows, + const std::vector* key_locs = nullptr); void build_limit_heap(size_t hash_table_size); // We should call this function only at 1st phase. diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index fae987394b4..1dab1669dd5 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -329,6 +329,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b if (limit) { need_do_agg = _emplace_into_hash_table_limit(_places.data(), block, key_locs, key_columns, rows); +rows = block->rows(); } else { _emplace_into_hash_table(_places.data(), key_columns, rows); } @@ -589,7 +590,8 @@ bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData bool need_filter = false; { SCOPED_TIMER(_hash_table_limit_compute_timer); -need_filter = _shared_state->do_limit_filter(block, num_rows); +need_filter = +_shared_state->do_limit_filter(block, num_rows, &key_locs); } auto& need_computes = _shared_state->need_computes; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 5b371877f36..1b7a151e2af 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -452,8 +452,14 @@ void AggLocalState::do_agg_limit(vectorized::Block* block, bool* eos) { if (_shared_state->reach_limit) { if (_shared_state->do_sort_limit && _shared_state->do_limit_filter(block, block->rows())) { vectorized::Block::filter_block_internal(block, _shared_state->need_computes); +if (auto rows = block->rows()) { +_num_rows_returned += rows; +COUNTER_UPDATE(
(doris) branch master updated: [refactor](spill) unify the entry point of spill tasks (#37020)
This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 8fb501b09ef [refactor](spill) unify the entry point of spill tasks (#37020) 8fb501b09ef is described below commit 8fb501b09efd77a299ab3405ff25f7a652f14b7a Author: Jerry Hu AuthorDate: Mon Jul 1 09:52:32 2024 +0800 [refactor](spill) unify the entry point of spill tasks (#37020) --- .../exec/partitioned_aggregation_sink_operator.cpp | 30 ++ .../partitioned_aggregation_source_operator.cpp| 29 ++ .../exec/partitioned_aggregation_source_operator.h | 1 - .../exec/partitioned_hash_join_probe_operator.cpp | 76 +++--- .../exec/partitioned_hash_join_probe_operator.h| 4 +- .../exec/partitioned_hash_join_sink_operator.cpp | 112 +++-- .../exec/partitioned_hash_join_sink_operator.h | 3 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 30 +- .../pipeline/exec/spill_sort_source_operator.cpp | 27 + be/src/pipeline/exec/spill_sort_source_operator.h | 1 - be/src/pipeline/exec/spill_utils.h | 76 ++ 11 files changed, 152 insertions(+), 237 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index b833289e0e0..4399f3c7045 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -22,6 +22,7 @@ #include "aggregation_sink_operator.h" #include "common/status.h" +#include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" #include "vec/spill/spill_stream_manager.h" @@ -253,14 +254,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { } }}; -auto execution_context = state->get_task_execution_context(); -/// Resources in shared state will be released when the operator is closed, -/// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. -/// So, we need hold the pointer of shared state. -std::weak_ptr shared_state_holder = -_shared_state->shared_from_this(); auto query_id = state->query_id(); -auto mem_tracker = state->get_query_ctx()->query_mem_tracker; MonotonicStopWatch submit_timer; submit_timer.start(); @@ -269,20 +263,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { "fault_inject partitioned_agg_sink revoke_memory submit_func failed"); return status; }); -status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( -[this, &parent, state, query_id, mem_tracker, shared_state_holder, execution_context, - submit_timer] { -SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); -std::shared_ptr execution_context_lock; -auto shared_state_sptr = shared_state_holder.lock(); -if (shared_state_sptr) { -execution_context_lock = execution_context.lock(); -} -if (!shared_state_sptr || !execution_context_lock) { -LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; -return Status::Cancelled("Cancelled"); -} + +auto spill_runnable = std::make_shared( +state, _shared_state->shared_from_this(), +[this, &parent, state, query_id, submit_timer] { DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", { auto st = Status::InternalError( "fault_inject partitioned_agg_sink " @@ -332,7 +316,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { parent._agg_sink_operator->reset_hash_table(runtime_state); return Base::_shared_state->sink_status; }); -return status; + +return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( +std::move(spill_runnable)); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index fd609d95eef..a8c4e7b0bcc 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -23,6 +23,7 @@ #include "common/exception.h" #include "common/status.h" #include "pipeline/exec/operator.h" +#include "pipeline/exec/spill_utils.h" #include "runtime/fragment_mgr.h" #include "util/runtime_profile.h" #
(doris) branch branch-2.1 updated: [branch-2.1] PIck "[Fix](autoinc) Hanlde the processing of auto_increment column on exchange node rather than on TabletWriter when using TABLET_SINK_SHUFFLE_PARTITIO
This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4210a6a8d6e [branch-2.1] PIck "[Fix](autoinc) Hanlde the processing of auto_increment column on exchange node rather than on TabletWriter when using TABLET_SINK_SHUFFLE_PARTITIONED #36836" (#37029) 4210a6a8d6e is described below commit 4210a6a8d6e3590dbe2fdea81b8027c1d08768bc Author: bobhan1 AuthorDate: Mon Jul 1 09:56:30 2024 +0800 [branch-2.1] PIck "[Fix](autoinc) Hanlde the processing of auto_increment column on exchange node rather than on TabletWriter when using TABLET_SINK_SHUFFLE_PARTITIONED #36836" (#37029) ## Proposed changes pick https://github.com/apache/doris/pull/36836 --- be/src/pipeline/exec/exchange_sink_operator.cpp| 5 +- be/src/vec/sink/vtablet_block_convertor.cpp| 5 +- be/src/vec/sink/writer/vtablet_writer.cpp | 2 + be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 + .../unique/test_unique_table_auto_inc.out | 62 ++ .../unique/test_unique_table_auto_inc.groovy | 38 + 6 files changed, 110 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 33b68a5ac30..5832a3695b8 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -254,9 +254,12 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::make_unique(_vpartition.get(), find_tablet_mode); _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); -//_block_convertor no need init_autoinc_info here +// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column +// on exchange node rather than on TabletWriter _block_convertor = std::make_unique(_tablet_sink_tuple_desc); +_block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(), +_state->batch_size()); _location = p._pool->add(new OlapTableLocationParam(p._tablet_sink_location)); _row_distribution.init( {.state = _state, diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index d93a654728d..4446e44f431 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -494,8 +494,7 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si vectorized::ColumnInt64::Container& dst_values = dst_column->get_data(); vectorized::ColumnPtr src_column_ptr = block->get_by_position(idx).column; -if (const vectorized::ColumnConst* const_column = -check_and_get_column(src_column_ptr)) { +if (const auto* const_column = check_and_get_column(src_column_ptr)) { // for insert stmt like "insert into tbl1 select null,col1,col2,... from tbl2" or // "insert into tbl1 select 1,col1,col2,... from tbl2", the type of literal's column // will be `ColumnConst` @@ -518,7 +517,7 @@ Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si int64_t value = const_column->get_int(0); dst_values.resize_fill(rows, value); } -} else if (const vectorized::ColumnNullable* src_nullable_column = +} else if (const auto* src_nullable_column = check_and_get_column(src_column_ptr)) { auto src_nested_column_ptr = src_nullable_column->get_nested_column_ptr(); const auto& null_map_data = src_nullable_column->get_null_map_data(); diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index f385575e7c0..c0f5bd8abc1 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1182,6 +1182,8 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { } _block_convertor = std::make_unique(_output_tuple_desc); +// if partition_type is TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column +// on exchange node rather than on TabletWriter _block_convertor->init_autoinc_info( _schema->db_id(), _schema->table_id(), _state->batch_size(), _schema->is_partial_update() && !_schema->auto_increment_coulumn().empty(), diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 1b14a57d154..c1c6e1cfc86 100644 --- a/be/s
(doris) branch master updated: [chore](Regression) Remove useless get provider code in regression framework (#37000)
This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 0babde17bf2 [chore](Regression) Remove useless get provider code in regression framework (#37000) 0babde17bf2 is described below commit 0babde17bf248dedff7a9cc7f61dc43506322187 Author: AlexYue AuthorDate: Mon Jul 1 09:59:48 2024 +0800 [chore](Regression) Remove useless get provider code in regression framework (#37000) The following get provider and check logic code is useless. --- .../main/groovy/org/apache/doris/regression/Config.groovy | 13 + .../groovy/org/apache/doris/regression/suite/Suite.groovy | 15 --- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 008962ee544..c6711184c01 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -564,16 +564,6 @@ class Config { return config } -static String getProvider(String endpoint) { -def providers = ["cos", "oss", "s3", "obs", "bos"] -for (final def provider in providers) { -if (endpoint.containsIgnoreCase(provider)) { -return provider -} -} -return "" -} - static void checkCloudSmokeEnv(Properties properties) { // external stage obj info String s3Endpoint = properties.getOrDefault("s3Endpoint", "") @@ -589,8 +579,7 @@ class Config { s3EndpointConf:s3Endpoint, s3BucketConf:s3BucketName, s3AKConf:s3AK, -s3SKConf:s3SK, -s3ProviderConf:getProvider(s3Endpoint) +s3SKConf:s3SK ] for (final def item in items) { if (item.value == null || item.value.isEmpty()) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index b3a2e958ff0..3397ab4ccfc 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -894,21 +894,6 @@ class Suite implements GroovyInterceptable { return; } -String getProvider() { -String s3Endpoint = context.config.otherConfigs.get("s3Endpoint") -return getProvider(s3Endpoint) -} - -String getProvider(String endpoint) { -def providers = ["cos", "oss", "s3", "obs", "bos"] -for (final def provider in providers) { -if (endpoint.containsIgnoreCase(provider)) { -return provider -} -} -return "" -} - int getTotalLine(String filePath) { def file = new File(filePath) int lines = 0; - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated (0babde17bf2 -> d42e57f0283)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 0babde17bf2 [chore](Regression) Remove useless get provider code in regression framework (#37000) add d42e57f0283 [regression-test](connector) Add a case for the response of streamload that the connector depends (#36864) No new revisions were added by this update. Summary of changes: .../data/flink_connector_p0/test_response.csv | 2 + .../flink_connector_response.groovy| 186 + 2 files changed, 188 insertions(+) create mode 100644 regression-test/data/flink_connector_p0/test_response.csv create mode 100644 regression-test/suites/flink_connector_p0/flink_connector_response.groovy - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated: [fix](local shuffle) Fix wrong partitioned expr in local exchanger (#37017)
This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 572c2b89883 [fix](local shuffle) Fix wrong partitioned expr in local exchanger (#37017) 572c2b89883 is described below commit 572c2b89883f7dcb56053bb6473a4ddc25a2e14d Author: Gabriel AuthorDate: Mon Jul 1 10:02:03 2024 +0800 [fix](local shuffle) Fix wrong partitioned expr in local exchanger (#37017) Now partitioned expressions in HASH-SHUFFLE local exchanger may be wrong. This PR fix it. --- be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++-- be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index d785c20ee7f..fad03f0a78d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -148,8 +148,8 @@ public: return _join_distribution == TJoinDistributionType::PARTITIONED; } bool require_data_distribution() const override { -return _join_distribution == TJoinDistributionType::COLOCATE || - _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; +return _join_distribution != TJoinDistributionType::BROADCAST && + _join_distribution != TJoinDistributionType::NONE; } private: diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index b8bc892ef31..0b4298f55ff 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -159,8 +159,8 @@ public: return _join_distribution == TJoinDistributionType::PARTITIONED; } bool require_data_distribution() const override { -return _join_distribution == TJoinDistributionType::COLOCATE || - _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; +return _join_distribution != TJoinDistributionType::BROADCAST && + _join_distribution != TJoinDistributionType::NONE; } private: - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch branch-2.0 updated: [fix](bitmap) incorrect type of BitmapValue with fastunion (#36834) (#36898)
This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.0 by this push: new c44d1582a1e [fix](bitmap) incorrect type of BitmapValue with fastunion (#36834) (#36898) c44d1582a1e is described below commit c44d1582a1eb2c310358c688e4d00d6bc509bdaa Author: Jerry Hu AuthorDate: Mon Jul 1 10:19:07 2024 +0800 [fix](bitmap) incorrect type of BitmapValue with fastunion (#36834) (#36898) ## Proposed changes pick #36834 --- be/src/util/bitmap_value.h | 5 +++-- be/test/util/bitmap_value_test.cpp | 23 +++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/be/src/util/bitmap_value.h b/be/src/util/bitmap_value.h index f75ca59bae6..9229945f529 100644 --- a/be/src/util/bitmap_value.h +++ b/be/src/util/bitmap_value.h @@ -1662,7 +1662,6 @@ public: case SINGLE: { _set.insert(_sv); _type = SET; -_convert_to_bitmap_if_need(); break; } case BITMAP: @@ -1673,10 +1672,12 @@ public: _type = BITMAP; break; case SET: { -_convert_to_bitmap_if_need(); break; } } +if (_type == SET) { +_convert_to_bitmap_if_need(); +} } if (_type == EMPTY && single_values.size() == 1) { diff --git a/be/test/util/bitmap_value_test.cpp b/be/test/util/bitmap_value_test.cpp index 6524a8152c5..d536e2e581e 100644 --- a/be/test/util/bitmap_value_test.cpp +++ b/be/test/util/bitmap_value_test.cpp @@ -124,6 +124,29 @@ TEST(BitmapValueTest, bitmap_union) { EXPECT_EQ(3, bitmap3.cardinality()); bitmap3.fastunion({&bitmap}); EXPECT_EQ(5, bitmap3.cardinality()); + +const auto old_config = config::enable_set_in_bitmap_value; +config::enable_set_in_bitmap_value = true; +BitmapValue bitmap4; // empty + +BitmapValue bitmap_set1; +BitmapValue bitmap_set2; +BitmapValue bitmap_set3; + +const int set_data1[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; +bitmap_set1.add_many(set_data1, 15); + +const int set_data2[] = {16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30}; +bitmap_set2.add_many(set_data2, 15); + +const int set_data3[] = {31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45}; +bitmap_set3.add_many(set_data3, 15); + +bitmap4.fastunion({&bitmap_set1, &bitmap_set2, &bitmap_set3}); + +EXPECT_EQ(bitmap4.cardinality(), 45); +EXPECT_EQ(bitmap4.get_type_code(), BitmapTypeCode::BITMAP32); +config::enable_set_in_bitmap_value = old_config; } TEST(BitmapValueTest, bitmap_intersect) { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [Improve] decrease memory usage when csv&gzip is on (#212)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 3e745e7 [Improve] decrease memory usage when csv&gzip is on (#212) 3e745e7 is described below commit 3e745e732fdade8a26856bd92026b44fd02d2787 Author: zhaorongsheng AuthorDate: Mon Jul 1 10:20:40 2024 +0800 [Improve] decrease memory usage when csv&gzip is on (#212) Co-authored-by: zhaorongsheng --- .../org/apache/doris/spark/load/StreamLoader.scala | 33 +++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 9481b6f..06bb56f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -20,6 +20,7 @@ package org.apache.doris.spark.load import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.json.JsonMapper +import org.apache.commons.io.IOUtils import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} -import java.io.{ByteArrayOutputStream, IOException} +import java.io.{ByteArrayOutputStream, IOException, InputStream} import java.net.{HttpURLConnection, URL} import java.nio.charset.StandardCharsets import java.util @@ -375,14 +376,13 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader if (compressType.nonEmpty) { if ("gz".equalsIgnoreCase(compressType.get) && format == DataFormat.CSV) { -val recordBatchString = new RecordBatchString(RecordBatch.newBuilder(iterator.asJava) +val recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(iterator.asJava) .format(format) .sep(columnSeparator) .delim(lineDelimiter) .schema(schema) .addDoubleQuotes(addDoubleQuotes).build, streamingPassthrough) -val content = recordBatchString.getContent -val compressedData = compressByGZ(content) +val compressedData = compressByGZ(recodeBatchInputStream) entity = Some(new ByteArrayEntity(compressedData)) } else { @@ -457,6 +457,31 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader compressedData } + /** + * compress data by gzip + * + * @param contentInputStream data content + * @throws + * @return compressed byte array data + */ + @throws[IOException] + def compressByGZ(contentInputStream: InputStream): Array[Byte] = { +var compressedData: Array[Byte] = null +try { + val baos = new ByteArrayOutputStream + val gzipOutputStream = new GZIPOutputStream(baos) + try { +IOUtils.copy(contentInputStream, gzipOutputStream) +gzipOutputStream.finish() +compressedData = baos.toByteArray + } finally { +if (baos != null) baos.close() +if (gzipOutputStream != null) gzipOutputStream.close() + } +} +compressedData + } + /** * handle stream load response * - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve] support group commit (#412)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 43a055a9 [improve] support group commit (#412) 43a055a9 is described below commit 43a055a9e6b4c728912725976eedbecdfb8b270c Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 1 10:31:15 2024 +0800 [improve] support group commit (#412) --- .../java/org/apache/doris/flink/sink/HttpPutBuilder.java | 4 +++- .../apache/doris/flink/sink/batch/DorisBatchStreamLoad.java | 6 ++ .../org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 12 +++- .../org/apache/doris/flink/sink/writer/LoadConstants.java| 1 + .../apache/doris/flink/sink/copy/TestDorisCopyWriter.java| 3 +++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java index 023cd31a..44f6c9fe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java @@ -111,7 +111,9 @@ public class HttpPutBuilder { } public HttpPutBuilder setLabel(String label) { -header.put("label", label); +if (label != null) { +header.put("label", label); +} return this; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index d9fba749..fbc6daa0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -67,6 +67,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -95,6 +96,7 @@ public class DorisBatchStreamLoad implements Serializable { private AtomicReference exception = new AtomicReference<>(null); private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); private BackendUtil backendUtil; +private boolean enableGroupCommit; public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -120,6 +122,7 @@ public class DorisBatchStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } +this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -260,6 +263,9 @@ public class DorisBatchStreamLoad implements Serializable { /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { +if (enableGroupCommit) { +label = null; +} refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); ByteArrayEntity entity = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 14e44dee..676de3df 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -58,6 +58,7 @@ import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -87,6 +88,7 @@ public class DorisStreamLoad impleme
(doris) branch master updated: [fix](ES Catalog)Add array types support in esquery function (#36936)
This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 64a78a0bcc3 [fix](ES Catalog)Add array types support in esquery function (#36936) 64a78a0bcc3 is described below commit 64a78a0bcc3b03e25ee8fffc43f32061c2f4c45a Author: qiye AuthorDate: Mon Jul 1 10:46:43 2024 +0800 [fix](ES Catalog)Add array types support in esquery function (#36936) Support array types in `esquery` function, and add some tests. --- .../expressions/functions/scalar/EsQuery.java | 4 ++- gensrc/script/doris_builtins_functions.py | 2 +- .../data/external_table_p0/es/test_es_query.out| 42 +++--- .../external_table_p0/es/test_es_query.groovy | 10 -- 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EsQuery.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EsQuery.java index a5fbd339c9f..28a6988bca4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EsQuery.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EsQuery.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.shape.BinaryExpression; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.BooleanType; import org.apache.doris.nereids.types.VarcharType; +import org.apache.doris.nereids.types.coercion.AnyDataType; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -38,7 +39,8 @@ public class EsQuery extends ScalarFunction implements BinaryExpression, ExplicitlyCastableSignature, PropagateNullable { public static final List SIGNATURES = ImmutableList.of( - FunctionSignature.ret(BooleanType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT, VarcharType.SYSTEM_DEFAULT) + FunctionSignature.ret(BooleanType.INSTANCE).args(AnyDataType.INSTANCE_WITHOUT_INDEX, +VarcharType.SYSTEM_DEFAULT) ); /** diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 1ce8127f17e..81c502d301d 100644 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -1582,7 +1582,7 @@ visible_functions = { [['esquery'], 'BOOLEAN', ['DATEV2', 'VARCHAR'], ''], [['esquery'], 'BOOLEAN', ['DATETIMEV2', 'VARCHAR'], ''], [['esquery'], 'BOOLEAN', ['TIMEV2', 'VARCHAR'], ''], -[['esquery'], 'BOOLEAN', ['ARRAY', 'VARCHAR'], ''], +[['esquery'], 'BOOLEAN', ['ARRAY', 'VARCHAR'], '', ['T']], [['esquery'], 'BOOLEAN', ['MAP', 'VARCHAR'], ''], [['esquery'], 'BOOLEAN', ['STRING', 'VARCHAR'], ''], [['esquery'], 'BOOLEAN', ['VARIANT', 'VARCHAR'], ''], diff --git a/regression-test/data/external_table_p0/es/test_es_query.out b/regression-test/data/external_table_p0/es/test_es_query.out index 605e2f1aa93..d751719389f 100644 --- a/regression-test/data/external_table_p0/es/test_es_query.out +++ b/regression-test/data/external_table_p0/es/test_es_query.out @@ -1,9 +1,9 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql01 -- -["2020-01-01", "2020-01-02"] [-1, 0, 1, 2] [0, 1, 2, 3]["d", "e", "f"] [128, 129, -129, -130] ["192.168.0.1", "127.0.0.1"]string1 [1, 2, 3, 4] 2022-08-08 2022-08-08T12:10:10 text#1 ["2020-01-01", "2020-01-02"] 3.14[1, 2, 3, 4][1, 1.1, 1.2, 1.3] [1, 2, 3, 4]["a", "b", "c"] ["{"name":"Andy","age":18}", "{"name":"Tim","age":28}"] 2022-08-08T12:10:10 2022-08-08T12:10:10 2022-08-08T20:10:10 [1, -2, -3, 4] [1, 0, 1, 1] [32768, 32769, -32769, -32770] \N +["2020-01-01 12:00:00", "2020-01-02 13:01:01"] [-1, 0, 1, 2] [0, 1, 2, 3] ["d", "e", "f"] [128, 129, -129, -130] ["192.168.0.1", "127.0.0.1"]string1 [1, 2, 3, 4]2022-08-08 2022-08-08T12:10:10 text#1 ["2020-01-01", "2020-01-02"]3.14[1, 2, 3, 4][1, 1.1, 1.2, 1.3] [1, 2, 3, 4] ["a", "b", "c"] ["{"name":"Andy","age":18}", "{"name":"Tim","age":28}"] 2022-08-08T12:10:10 2022-08-08T12:10:10 2022-08-08T20:10:10 [1, -2, -3, 4] [1, 0, 1, 1][32768, 32769, -32769, -32770] \N -- !sql02 -- -["2020-01-01", "2020-01-02"] [-1, 0, 1, 2] [0, 1, 2, 3]["d", "e", "f"] [128, 129, -129, -130] ["192.168.0.1", "127.0.0.1"]string1 [1, 2, 3, 4] 2022-08-08 2022-08-08T12:10:10 text#1 ["2020-01-01", "2020-01-02"] 3.14[1, 2, 3, 4][1, 1.1, 1.2, 1.3] [1, 2, 3, 4]["a", "b", "c"] ["{"name":"Andy","age":18}", "{"name":"Tim","age":
(doris) branch master updated: [regression-test](case) modify statistics table name (#36689)
This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 86defede17d [regression-test](case) modify statistics table name (#36689) 86defede17d is described below commit 86defede17d8e2225db273a26bef3c0ec16b1cb0 Author: shuke <37901441+shuke...@users.noreply.github.com> AuthorDate: Mon Jul 1 11:23:13 2024 +0800 [regression-test](case) modify statistics table name (#36689) --- regression-test/suites/pipeline_p0/statitics_compaction.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/regression-test/suites/pipeline_p0/statitics_compaction.groovy b/regression-test/suites/pipeline_p0/statitics_compaction.groovy index eaf723c763c..aebe55939d5 100644 --- a/regression-test/suites/pipeline_p0/statitics_compaction.groovy +++ b/regression-test/suites/pipeline_p0/statitics_compaction.groovy @@ -66,5 +66,5 @@ suite("statistic_table_compaction", "nonConcurrent,p0") { } do_compaction("__internal_schema.column_statistics") -do_compaction("__internal_schema.histogram_statistics") -} \ No newline at end of file +do_compaction("__internal_schema.partition_statistics") +} - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated (86defede17d -> 6c707828766)
This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 86defede17d [regression-test](case) modify statistics table name (#36689) add 6c707828766 [feature](function) support ip functions named ipv4_to_ipv6 and cut_ipv6 (#36883) No new revisions were added by this update. Summary of changes: be/src/vec/functions/function_ip.cpp | 6 + be/src/vec/functions/function_ip.h | 137 + be/test/vec/function/function_ip_test.cpp | 75 +++ be/test/vec/function/function_test_util.cpp| 16 +++ be/test/vec/function/function_test_util.h | 3 + .../doris/catalog/BuiltinScalarFunctions.java | 16 ++- .../scalar/{Replace.java => CutIpv6.java} | 33 +++-- .../scalar/{Acos.java => Ipv4ToIpv6.java} | 31 +++-- .../expressions/visitor/ScalarFunctionVisitor.java | 58 + gensrc/script/doris_builtins_functions.py | 2 + .../ip_functions/test_cut_ipv6_function.out| 19 +++ .../ip_functions/test_ipv4_to_ipv6_function.out| 14 +++ .../ip_functions/test_cut_ipv6_function.groovy | 56 + .../test_ipv4_to_ipv6_function.groovy} | 39 +++--- 14 files changed, 422 insertions(+), 83 deletions(-) copy fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/{Replace.java => CutIpv6.java} (69%) copy fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/{Acos.java => Ipv4ToIpv6.java} (72%) create mode 100644 regression-test/data/query_p0/sql_functions/ip_functions/test_cut_ipv6_function.out create mode 100644 regression-test/data/query_p0/sql_functions/ip_functions/test_ipv4_to_ipv6_function.out create mode 100644 regression-test/suites/query_p0/sql_functions/ip_functions/test_cut_ipv6_function.groovy copy regression-test/suites/{datatype_p0/ip/test_ip_implicit_cast.groovy => query_p0/sql_functions/ip_functions/test_ipv4_to_ipv6_function.groovy} (60%) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-website) branch master updated: [doc](duplicate) Fix Doc Spelling Mistake (#802)
This is an automated email from the ASF dual-hosted git repository. luzhijing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-website.git The following commit(s) were added to refs/heads/master by this push: new 7698cbcbca [doc](duplicate) Fix Doc Spelling Mistake (#802) 7698cbcbca is described below commit 7698cbcbca25f74426e0b00715ac224888be8204 Author: Wanghuan AuthorDate: Mon Jul 1 11:54:07 2024 +0800 [doc](duplicate) Fix Doc Spelling Mistake (#802) Co-authored-by: Luzhijing <82810928+luzhij...@users.noreply.github.com> --- .../current/table-design/data-model/duplicate.md | 8 .../version-2.0/table-design/data-model/duplicate.md | 8 .../version-2.1/table-design/data-model/duplicate.md | 8 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/table-design/data-model/duplicate.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/table-design/data-model/duplicate.md index 4571730cc5..042b493270 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/table-design/data-model/duplicate.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/table-design/data-model/duplicate.md @@ -24,11 +24,11 @@ specific language governing permissions and limitations under the License. --> -明细模型,也成为 Duplicate 数据模型。 +明细模型,也称为 Duplicate 数据模型。 在某些多维分析场景下,数据既没有主键,也没有聚合需求。针对这种需求,可以使用明细数据模型。 -在明细数据模型中,数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 DUPLICATE KEY,只是用来指明数据存储按照哪些列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 +在明细数据模型中,数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 Duplicate Key,只是用来指明数据存储按照哪些列进行排序。在 Duplicate Key 的选择上,建议选择前 2-4 列即可。 举例如下,一个表有如下的数据列,没有主键更新和基于聚合键的聚合需求。 @@ -116,7 +116,7 @@ MySQL > desc example_tbl_duplicate_without_keys_by_default; ## 指定排序列的明细模型 -在建表语句中指定 DUPLICATE KEY,用来指明数据存储按照这些 Key 列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 +在建表语句中指定 Duplicate Key,用来指明数据存储按照这些 Key 列进行排序。在 Duplicate Key 的选择上,建议选择前 2-4 列即可。 建表语句举例如下,指定了按照 timestamp、type 和 error_code 三列进行排序。 @@ -150,4 +150,4 @@ MySQL > desc example_tbl_duplicate; 6 rows in set (0.01 sec) ``` -数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 DUPLICATE KEY,只是用来指明数据存储按照哪些列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 \ No newline at end of file +数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 Duplicate Key,只是用来指明数据存储按照哪些列进行排序。在 Duplicate Key 的选择上,建议选择前 2-4 列即可。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/table-design/data-model/duplicate.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/table-design/data-model/duplicate.md index 4571730cc5..042b493270 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/table-design/data-model/duplicate.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/table-design/data-model/duplicate.md @@ -24,11 +24,11 @@ specific language governing permissions and limitations under the License. --> -明细模型,也成为 Duplicate 数据模型。 +明细模型,也称为 Duplicate 数据模型。 在某些多维分析场景下,数据既没有主键,也没有聚合需求。针对这种需求,可以使用明细数据模型。 -在明细数据模型中,数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 DUPLICATE KEY,只是用来指明数据存储按照哪些列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 +在明细数据模型中,数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 Duplicate Key,只是用来指明数据存储按照哪些列进行排序。在 Duplicate Key 的选择上,建议选择前 2-4 列即可。 举例如下,一个表有如下的数据列,没有主键更新和基于聚合键的聚合需求。 @@ -116,7 +116,7 @@ MySQL > desc example_tbl_duplicate_without_keys_by_default; ## 指定排序列的明细模型 -在建表语句中指定 DUPLICATE KEY,用来指明数据存储按照这些 Key 列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 +在建表语句中指定 Duplicate Key,用来指明数据存储按照这些 Key 列进行排序。在 Duplicate Key 的选择上,建议选择前 2-4 列即可。 建表语句举例如下,指定了按照 timestamp、type 和 error_code 三列进行排序。 @@ -150,4 +150,4 @@ MySQL > desc example_tbl_duplicate; 6 rows in set (0.01 sec) ``` -数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 DUPLICATE KEY,只是用来指明数据存储按照哪些列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 \ No newline at end of file +数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 Duplicate Key,只是用来指明数据存储按照哪些列进行排序。在 Duplicate Key 的选择上,建议选择前 2-4 列即可。 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/table-design/data-model/duplicate.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/table-design/data-model/duplicate.md index 4571730cc5..042b493270 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/table-design/data-model/duplicate.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/table-design/data-model/duplicate.md @@ -24,11 +24,11 @@ specific language governing permissions and limitations under the License. --> -明细模型,也成为 Duplicate 数据模型。 +明细模型,也称为 Duplicate 数据模型。 在某些多维分析场景下,数据既没有主键,也没有聚合需求。针对这种需求,可以使用明细数据模型。 -在明细数据模型中,数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 DUPLICATE KEY,只是用来指明数据存储按照哪些列进行排序。在 DUPLICATE KEY 的选择上,建议选择前 2-4 列即可。 +在明细数据模型中,数据按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。而在建表语句中指定的 Duplicate Key,只
Error while running notifications feature from refs/heads/master:.asf.yaml in doris-website!
An error occurred while running notifications feature in .asf.yaml!: Invalid notification target 'comm...@foo.apache.org'. Must be a valid @doris.apache.org list! - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated (6c707828766 -> c8f1b9f4ae0)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 6c707828766 [feature](function) support ip functions named ipv4_to_ipv6 and cut_ipv6 (#36883) add c8f1b9f4ae0 [opt](hive) save hive table schema in transaction (#37008) No new revisions were added by this update. Summary of changes: .../org/apache/doris/datasource/hive/HMSTransaction.java | 15 +++ 1 file changed, 11 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch branch-2.1 updated: [branch-2.1][improvement](jdbc catalog) Modify the maximum number of connections in the connection pool to 30 by default (#37023)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 62c4451c971 [branch-2.1][improvement](jdbc catalog) Modify the maximum number of connections in the connection pool to 30 by default (#37023) 62c4451c971 is described below commit 62c4451c9718972347e8529fd71fcd9a3e37f45e Author: zy-kkk AuthorDate: Mon Jul 1 12:22:20 2024 +0800 [branch-2.1][improvement](jdbc catalog) Modify the maximum number of connections in the connection pool to 30 by default (#37023) pick (#36720) In many cases, we found that users would use JDBC Catalog to perform a large number of queries, which resulted in the maximum of 10 connections being insufficient, so I adjusted it to 30, which covered most needs. --- .../src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java | 2 +- fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java | 2 +- .../src/test/java/org/apache/doris/catalog/JdbcResourceTest.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java index 5fdbc211ab0..a99377add25 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java @@ -31,7 +31,7 @@ public class JdbcDataSourceConfig { private TJdbcOperation op; private TOdbcTableType tableType; private int connectionPoolMinSize = 1; -private int connectionPoolMaxSize = 10; +private int connectionPoolMaxSize = 30; private int connectionPoolMaxWaitTime = 5000; private int connectionPoolMaxLifeTime = 180; private boolean connectionPoolKeepAlive = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index c411c6d1143..e8498d0a2d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -139,7 +139,7 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1"); -OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "10"); +OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "30"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "180"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java index 8e004d4b236..81c2157686a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java @@ -87,7 +87,7 @@ public class JdbcResourceTest { // Verify the default properties were applied during the replay Map properties = jdbcResource.getCopiedProperties(); Assert.assertEquals("1", properties.get("connection_pool_min_size")); -Assert.assertEquals("10", properties.get("connection_pool_max_size")); +Assert.assertEquals("30", properties.get("connection_pool_max_size")); Assert.assertEquals("180", properties.get("connection_pool_max_life_time")); Assert.assertEquals("5000", properties.get("connection_pool_max_wait_time")); Assert.assertEquals("false", properties.get("connection_pool_keep_alive")); @@ -110,7 +110,7 @@ public class JdbcResourceTest { // Verify the default properties were applied during the replay Map properties = replayedResource.getCopiedProperties(); Assert.assertEquals("1", properties.get("connection_pool_min_size")); -Assert.assertEquals("10", properties.get("connection_pool_max_size")); +Assert.assertEquals("30", properties.get("connection_pool_max_size")); Assert.assertEquals("180", properties.get("connection_pool_max_life_time")); Assert.assertEquals("5000", properties.get("connection_pool_max_wait_time")); Assert.assertEquals("false", properties.get("connection_pool_keep_alive")); - To unsubscribe, e-mail: commi
(doris) branch branch-2.0 updated: [branch-2.0][improvement](jdbc catalog) Modify the maximum number of connections in the connection pool to 30 by default (#37024)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.0 by this push: new 880260874e7 [branch-2.0][improvement](jdbc catalog) Modify the maximum number of connections in the connection pool to 30 by default (#37024) 880260874e7 is described below commit 880260874e72b7947299ba27d55f7f77f41377bc Author: zy-kkk AuthorDate: Mon Jul 1 12:30:08 2024 +0800 [branch-2.0][improvement](jdbc catalog) Modify the maximum number of connections in the connection pool to 30 by default (#37024) pick (#36720) In many cases, we found that users would use JDBC Catalog to perform a large number of queries, which resulted in the maximum of 10 connections being insufficient, so I adjusted it to 30, which covered most needs. --- .../src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java | 2 +- fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java | 2 +- .../src/test/java/org/apache/doris/catalog/JdbcResourceTest.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java index 5fdbc211ab0..a99377add25 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java @@ -31,7 +31,7 @@ public class JdbcDataSourceConfig { private TJdbcOperation op; private TOdbcTableType tableType; private int connectionPoolMinSize = 1; -private int connectionPoolMaxSize = 10; +private int connectionPoolMaxSize = 30; private int connectionPoolMaxWaitTime = 5000; private int connectionPoolMaxLifeTime = 180; private boolean connectionPoolKeepAlive = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 1faf27e1040..a5ee2c2b2a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -148,7 +148,7 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1"); -OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "10"); +OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_SIZE, "30"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "180"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java index b88597eca25..10e8036dad7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/JdbcResourceTest.java @@ -87,7 +87,7 @@ public class JdbcResourceTest { // Verify the default properties were applied during the replay Map properties = jdbcResource.getCopiedProperties(); Assert.assertEquals("1", properties.get("connection_pool_min_size")); -Assert.assertEquals("10", properties.get("connection_pool_max_size")); +Assert.assertEquals("30", properties.get("connection_pool_max_size")); Assert.assertEquals("180", properties.get("connection_pool_max_life_time")); Assert.assertEquals("5000", properties.get("connection_pool_max_wait_time")); Assert.assertEquals("false", properties.get("connection_pool_keep_alive")); @@ -110,7 +110,7 @@ public class JdbcResourceTest { // Verify the default properties were applied during the replay Map properties = replayedResource.getCopiedProperties(); Assert.assertEquals("1", properties.get("connection_pool_min_size")); -Assert.assertEquals("10", properties.get("connection_pool_max_size")); +Assert.assertEquals("30", properties.get("connection_pool_max_size")); Assert.assertEquals("180", properties.get("connection_pool_max_life_time")); Assert.assertEquals("5000", properties.get("connection_pool_max_wait_time")); Assert.assertEquals("false", properties.get("connection_pool_keep_alive")); - To unsubscribe, e-mail: commi
(doris) branch branch-2.0 updated: [fix](jdbc catalog) fix jdbc table sink writer close nullptr and add log (#37021)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.0 by this push: new 250bca942a1 [fix](jdbc catalog) fix jdbc table sink writer close nullptr and add log (#37021) 250bca942a1 is described below commit 250bca942a118c2f0812e61b8a60a9f7e7de9062 Author: zy-kkk AuthorDate: Mon Jul 1 12:31:41 2024 +0800 [fix](jdbc catalog) fix jdbc table sink writer close nullptr and add log (#37021) fix jdbc table sink writer close nullptr and add log --- be/src/vec/sink/vjdbc_table_sink.cpp | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp index 2663dc236a1..bb3e1123ac5 100644 --- a/be/src/vec/sink/vjdbc_table_sink.cpp +++ b/be/src/vec/sink/vjdbc_table_sink.cpp @@ -112,7 +112,11 @@ Status VJdbcTableSink::close(RuntimeState* state, Status exec_status) { if (exec_status.ok() && _use_transaction) { RETURN_IF_ERROR(_writer->finish_trans()); } -RETURN_IF_ERROR(_writer->close()); +if (_writer == nullptr) { +LOG(WARNING) << "debug invalid nullptr writer when close"; +} else { +RETURN_IF_ERROR(_writer->close()); +} return DataSink::close(state, exec_status); } } // namespace vectorized - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated: [opt](function)avoid virtual function calls in geo functions (#37003)
This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 444f96aa913 [opt](function)avoid virtual function calls in geo functions (#37003) 444f96aa913 is described below commit 444f96aa9136f81ccd53244e0e41769f54f0e064 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Mon Jul 1 12:53:32 2024 +0800 [opt](function)avoid virtual function calls in geo functions (#37003) --- be/src/vec/functions/functions_geo.cpp | 285 + be/src/vec/functions/functions_geo.h | 5 +- 2 files changed, 189 insertions(+), 101 deletions(-) diff --git a/be/src/vec/functions/functions_geo.cpp b/be/src/vec/functions/functions_geo.cpp index 036033db2a2..b389bc1636e 100644 --- a/be/src/vec/functions/functions_geo.cpp +++ b/be/src/vec/functions/functions_geo.cpp @@ -26,6 +26,7 @@ #include "geo/geo_common.h" #include "geo/geo_types.h" #include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" #include "vec/columns/columns_number.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" @@ -33,6 +34,7 @@ #include "vec/core/field.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" #include "vec/functions/simple_function_factory.h" namespace doris::vectorized { @@ -41,6 +43,7 @@ struct StPoint { static constexpr auto NEED_CONTEXT = false; static constexpr auto NAME = "st_point"; static const size_t NUM_ARGS = 2; +using Type = DataTypeString; static Status execute(Block& block, const ColumnNumbers& arguments, size_t result) { DCHECK_EQ(arguments.size(), 2); auto return_type = block.get_data_type(result); @@ -52,26 +55,29 @@ struct StPoint { const auto size = std::max(left_column->size(), right_column->size()); -MutableColumnPtr res = return_type->create_column(); - +auto res = ColumnString::create(); +auto null_map = ColumnUInt8::create(size, 0); +auto& null_map_data = null_map->get_data(); GeoPoint point; std::string buf; if (left_const) { -const_vector(left_column, right_column, res, size, point, buf); +const_vector(left_column, right_column, res, null_map_data, size, point, buf); } else if (right_const) { -vector_const(left_column, right_column, res, size, point, buf); +vector_const(left_column, right_column, res, null_map_data, size, point, buf); } else { -vector_vector(left_column, right_column, res, size, point, buf); +vector_vector(left_column, right_column, res, null_map_data, size, point, buf); } -block.replace_by_position(result, std::move(res)); +block.replace_by_position(result, + ColumnNullable::create(std::move(res), std::move(null_map))); return Status::OK(); } -static void loop_do(GeoParseStatus& cur_res, MutableColumnPtr& res, GeoPoint& point, -std::string& buf) { +static void loop_do(GeoParseStatus& cur_res, ColumnString::MutablePtr& res, NullMap& null_map, +int row, GeoPoint& point, std::string& buf) { if (cur_res != GEO_PARSE_OK) { -res->insert_data(nullptr, 0); +null_map[row] = 1; +res->insert_default(); return; } @@ -81,32 +87,32 @@ struct StPoint { } static void const_vector(const ColumnPtr& left_column, const ColumnPtr& right_column, - MutableColumnPtr& res, const size_t size, GeoPoint& point, - std::string& buf) { + ColumnString::MutablePtr& res, NullMap& null_map, const size_t size, + GeoPoint& point, std::string& buf) { double x = left_column->operator[](0).get(); for (int row = 0; row < size; ++row) { auto cur_res = point.from_coord(x, right_column->operator[](row).get()); -loop_do(cur_res, res, point, buf); +loop_do(cur_res, res, null_map, row, point, buf); } } static void vector_const(const ColumnPtr& left_column, const ColumnPtr& right_column, - MutableColumnPtr& res, const size_t size, GeoPoint& point, - std::string& buf) { + ColumnString::MutablePtr& res, NullMap& null_map, const size_t size, + GeoPoint& point, std::string& buf) { double y = right_column->operator[](0).get(); for (int row = 0; row < size; ++row) { auto cur_res = point.from_coord(right_column->operator[](row).get(),
(doris) branch master updated: [fix](cloud) Update mtime only if partitions have updated time (#37055)
This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 686a1c6ebbb [fix](cloud) Update mtime only if partitions have updated time (#37055) 686a1c6ebbb is described below commit 686a1c6ebbb414a80e70404140d342bd83dfdd5d Author: Gavin Chou AuthorDate: Mon Jul 1 14:08:43 2024 +0800 [fix](cloud) Update mtime only if partitions have updated time (#37055) This PR fix the in-compatibility introduced by #34615 --- .../src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index 1246c5b640b..882bb7f6933 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -205,7 +205,9 @@ public class CloudPartition extends Partition { for (int i = 0; i < size; ++i) { Long version = versions.get(i); if (version > Partition.PARTITION_INIT_VERSION) { -partitions.get(i).setCachedVisibleVersion(versions.get(i), versionUpdateTimesMs.get(i)); +// For compatibility, the existing partitions may not have mtime +long mTime = versions.size() == versionUpdateTimesMs.size() ? versionUpdateTimesMs.get(i) : 0; +partitions.get(i).setCachedVisibleVersion(versions.get(i), mTime); } } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated (444f96aa913 -> b3c1ebbe162)
This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 444f96aa913 [opt](function)avoid virtual function calls in geo functions (#37003) add b3c1ebbe162 [fix](cloud) Allow access to MS during the replay (#37053) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java| 4 .../src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java | 6 -- 2 files changed, 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch branch-2.1 updated: [Feature](Prepared Statement) fix and enable enable_server_side_prepared_statement by default #36581 (#36818)
This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 14c991f09bc [Feature](Prepared Statement) fix and enable enable_server_side_prepared_statement by default #36581 (#36818) 14c991f09bc is described below commit 14c991f09bc0b7e1a2a2e64e5fcfdb3ad8d01d93 Author: lihangyu <15605149...@163.com> AuthorDate: Mon Jul 1 14:35:17 2024 +0800 [Feature](Prepared Statement) fix and enable enable_server_side_prepared_statement by default #36581 (#36818) picked from #36581 --- .../trees/plans/commands/PrepareCommand.java | 5 ++ .../java/org/apache/doris/qe/ConnectContext.java | 9 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 3 +- .../org/apache/doris/qe/PointQueryExecutor.java| 10 ++- .../java/org/apache/doris/qe/SessionVariable.java | 8 +- .../data/prepared_stmt_p0/prepared_stmt.out| 36 .../test_compaction_uniq_keys_row_store.groovy | 3 - .../insert_group_commit_with_exception.groovy | 19 ++-- .../insert_group_commit_with_prepare_stmt.groovy | 4 +- .../suites/point_query_p0/test_point_query.groovy | 2 - .../test_point_query_cluster_key.groovy| 25 +++--- .../test_point_query_partition.groovy | 4 +- .../suites/prepared_stmt_p0/prepared_stmt.groovy | 100 +++-- 13 files changed, 192 insertions(+), 36 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java index 958fc470283..43778a1e005 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.nereids.trees.expressions.Placeholder; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -102,6 +103,10 @@ public class PrepareCommand extends Command { LOG.debug("add prepared statement {}, isBinaryProtocol {}", name, ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE); } +if (logicalPlan instanceof InsertIntoTableCommand +&& ((InsertIntoTableCommand) logicalPlan).getLabelName().isPresent()) { +throw new org.apache.doris.common.UserException("Only support prepare InsertStmt without label now"); +} ctx.addPreparedStatementContext(name, new PreparedStatementContext(this, ctx, ctx.getStatementContext(), name)); if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 16b1b3c2c83..6284275e73e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -35,6 +35,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; @@ -394,12 +395,18 @@ public class ConnectContext { this.preparedStmtCtxs.put(stmtName, ctx); } -public void addPreparedStatementContext(String stmtName, PreparedStatementContext ctx) { +public void addPreparedStatementContext(String stmtName, PreparedStatementContext ctx) throws UserException { +if (this.preparedStatementContextMap.size() > sessionVariable.maxPreparedStmtCount) { +throw new UserException("Failed to create a server prepared statement" ++ "possibly because there are too many active prepared statements on server already." ++ "set max_prepared_stmt_count with larger number than " + sessionVariable.maxPreparedStmtCount); +} this.preparedStatementContextMap.put(stmtName, ctx); } public void removePrepareStmt(String stmtName) { this.preparedStmtCtxs.remove(stmtName); +this.preparedStatementContextMap.remove(stmtName); } public PrepareStmtContext getPreparedStmt(String stmtName) { diff --git a/fe/fe-cor
(doris) branch master updated: [feat](Nereids) Optimize query by pushing down aggregation through join on foreign key (#36035)
This is an automated email from the ASF dual-hosted git repository. xiejiann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 6889225b19e [feat](Nereids) Optimize query by pushing down aggregation through join on foreign key (#36035) 6889225b19e is described below commit 6889225b19e5826d74582c518f3d38982a1e3886 Author: 谢健 AuthorDate: Mon Jul 1 14:37:23 2024 +0800 [feat](Nereids) Optimize query by pushing down aggregation through join on foreign key (#36035) ## Proposed changes This PR optimizes query performance by pushing down aggregations through joins when grouped by a foreign key. This adjustment reduces data processing overhead above the join, improving both speed and resource efficiency. Transformation Example: Before Optimization: ``` Aggregation(group by fk) | Join(pk = fk) / \ pk fk ``` After Optimization: ``` Join(pk = fk) / \ pk Aggregation(group by fk) | fk ``` --- .../doris/nereids/jobs/executor/Rewriter.java | 6 +- .../apache/doris/nereids/properties/FuncDeps.java | 19 ++ .../org/apache/doris/nereids/rules/RuleType.java | 2 +- .../rewrite/PushDownAggThroughJoinOnPkFk.java | 348 + .../rewrite/PushDownAggThroughJoinOnPkFkTest.java | 158 ++ .../shape/query38.out | 51 ++- .../shape/query87.out | 51 ++- .../noStatsRfPrune/query38.out | 51 ++- .../noStatsRfPrune/query87.out | 51 ++- .../no_stats_shape/query38.out | 51 ++- .../no_stats_shape/query87.out | 51 ++- .../rf_prune/query38.out | 51 ++- .../rf_prune/query87.out | 51 ++- .../nereids_tpcds_shape_sf100_p0/shape/query38.out | 51 ++- .../nereids_tpcds_shape_sf100_p0/shape/query87.out | 51 ++- 15 files changed, 770 insertions(+), 273 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 9505bdca87d..0a2906ca055 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -110,6 +110,7 @@ import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoEsScan; import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoJdbcScan; import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoOdbcScan; import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoin; +import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOnPkFk; import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOneSide; import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin; import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject; @@ -348,8 +349,9 @@ public class Rewriter extends AbstractBatchJobExecutor { ), // this rule should be invoked after topic "Join pull up" -topic("eliminate group by keys according to fd items", -topDown(new EliminateGroupByKey()) +topic("eliminate Aggregate according to fd items", +topDown(new EliminateGroupByKey()), +topDown(new PushDownAggThroughJoinOnPkFk()) ), topic("Limit optimization", diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java index c17fd2eee57..be7b0853605 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/FuncDeps.java @@ -62,6 +62,7 @@ public class FuncDeps { } private final Set items; +// determinants -> dependencies private final Map, Set>> edges; public FuncDeps() { @@ -159,6 +160,24 @@ public class FuncDeps { return items.contains(new FuncDepsItem(dominate, dependency)); } +public boolean isCircleDeps(Set dominate, Set dependency) { +return items.contains(new FuncDepsItem(dominate, dependency)) +&& items.contains(new FuncDepsItem(dependency, dominate)); +} + +/** + * find the determinants of dependencies + */ +public Set> findDeterminats(Set dependency) { +Set> determinants = new HashSet<>(); +for (FuncDepsItem item : items) { +if (item.dependencies.equals(dependency)) { +determinants.add(item.determinants); +} +} +return determinants; +} + @Overri
(doris) branch master updated: [Migrate-Test](multi-catalog) Migrate p2 tests from p2 to p0. (#36989)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 50c64bc3cdf [Migrate-Test](multi-catalog) Migrate p2 tests from p2 to p0. (#36989) 50c64bc3cdf is described below commit 50c64bc3cdf654194bbaa51a6a8c06722253790f Author: Qi Chen AuthorDate: Mon Jul 1 14:45:50 2024 +0800 [Migrate-Test](multi-catalog) Migrate p2 tests from p2 to p0. (#36989) ## Proposed changes [Migrate-Test] (multi-catalog) Migrate p2 tests from p2 to p0. - Migrate p2 tests from p2 to p0. - Set health check of hms docker to 10s(interval) * 120(retries). - Remove duplicated tables in `create_preinstalled_table.hql` by adding new scripts. --- .../docker-compose/hive/hive-2x.yaml.tpl | 2 +- .../docker-compose/hive/hive-3x.yaml.tpl | 2 +- .../hive/scripts/create_preinstalled_table.hql | 108 .../data/default/account_fund/create_table.hql | 28 + .../scripts/data/default/account_fund/data.tar.gz | Bin 0 -> 234 bytes .../hive/scripts/data/default/account_fund/run.sh | 12 ++ .../scripts/data/default/hive01/create_table.hql | 22 .../hive/scripts/data/default/hive01/data.tar.gz | Bin 0 -> 186 bytes .../hive/scripts/data/default/hive01/run.sh| 12 ++ .../data/default/sale_table/create_table.hql | 24 .../scripts/data/default/sale_table/data.tar.gz| Bin 0 -> 221 bytes .../hive/scripts/data/default/sale_table/run.sh| 12 ++ .../data/default/string_table/create_table.hql | 27 .../scripts/data/default/string_table/data.tar.gz | Bin 0 -> 260 bytes .../hive/scripts/data/default/string_table/run.sh | 12 ++ .../scripts/data/default/student/create_table.hql | 24 .../hive/scripts/data/default/student/data.tar.gz | Bin 0 -> 210 bytes .../hive/scripts/data/default/student/run.sh | 12 ++ .../scripts/data/default/test1/create_table.hql| 23 .../hive/scripts/data/default/test1/data.tar.gz| Bin 0 -> 211 bytes .../hive/scripts/data/default/test1/run.sh | 12 ++ .../scripts/data/default/test2/create_table.hql| 23 .../hive/scripts/data/default/test2/data.tar.gz| Bin 0 -> 197 bytes .../hive/scripts/data/default/test2/run.sh | 12 ++ .../data/default/test_hive_doris/create_table.hql | 20 +++ .../data/default/test_hive_doris/data.tar.gz | Bin 0 -> 181 bytes .../scripts/data/default/test_hive_doris/run.sh| 12 ++ .../par_fields_in_file_orc/create_table.hql| 21 .../par_fields_in_file_orc/data.tar.gz | Bin 0 -> 751 bytes .../multi_catalog/par_fields_in_file_orc/run.sh| 12 ++ .../par_fields_in_file_parquet/create_table.hql| 21 .../par_fields_in_file_parquet/data.tar.gz | Bin 0 -> 548 bytes .../par_fields_in_file_parquet/run.sh | 12 ++ .../partition_location_1/create_table.hql | 22 .../multi_catalog/partition_location_1/data.tar.gz | Bin 0 -> 583 bytes .../data/multi_catalog/partition_location_1/run.sh | 12 ++ .../partition_location_2/create_table.hql | 23 .../multi_catalog/partition_location_2/data.tar.gz | Bin 0 -> 600 bytes .../data/multi_catalog/partition_location_2/run.sh | 12 ++ .../timestamp_with_time_zone/create_table.hql | 17 +++ .../timestamp_with_time_zone/data.tar.gz | Bin 0 -> 1499 bytes .../multi_catalog/timestamp_with_time_zone/run.sh | 12 ++ .../scripts/data/test/hive_test/create_table.hql | 20 +++ .../hive/scripts/data/test/hive_test/data.tar.gz | Bin 0 -> 161 bytes .../hive/scripts/data/test/hive_test/run.sh| 12 ++ .../hive/test_external_catalog_hive.out| 139 +++-- .../hive/test_hive_partition_location.out | 40 ++ .../hive/test_external_catalog_hive.groovy | 99 --- .../hive/test_hive_partition_location.groovy | 18 +-- 49 files changed, 692 insertions(+), 199 deletions(-) diff --git a/docker/thirdparties/docker-compose/hive/hive-2x.yaml.tpl b/docker/thirdparties/docker-compose/hive/hive-2x.yaml.tpl index ca0fe2e9ddb..0aec9ec2365 100644 --- a/docker/thirdparties/docker-compose/hive/hive-2x.yaml.tpl +++ b/docker/thirdparties/docker-compose/hive/hive-2x.yaml.tpl @@ -89,7 +89,7 @@ services: - hive-metastore-postgresql healthcheck: test: ["CMD", "sh", "-c", "/mnt/scripts/healthy_check.sh"] - interval: 5s + interval: 10s timeout: 60s retries: 120 network_mode: "host" diff --git a/docker/thirdparties/docker-compose/hive/hive-3x.yaml.tpl b/docker/thirdparties/docker-compose/hive/hive-3x.yaml.tpl index 09d150c17b2..901e5b3f71a 100644 --- a/docker/thirdparties/docker-compose/hive/hive-3x.yaml.tpl +++ b/docker/thirdparties/docker-compose/hive/hive-3x.yaml.tpl @@ -89,
(doris) branch branch-2.1 updated: [feat](Nereids) Optimize Sum Literal Rewriting by Excluding Single Instances (#35559) (#37047)
This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 24d236b210d [feat](Nereids) Optimize Sum Literal Rewriting by Excluding Single Instances (#35559) (#37047) 24d236b210d is described below commit 24d236b210d956cde48a187958057a578838f03a Author: 谢健 AuthorDate: Mon Jul 1 14:57:15 2024 +0800 [feat](Nereids) Optimize Sum Literal Rewriting by Excluding Single Instances (#35559) (#37047) pick from master #35559 This PR introduces a change in the method removeOneSumLiteral to enhance the performance of sum literal rewriting in SQL queries. The modification ensures that sum literals appearing only once, such as in expressions like select count(id1 + 1), count(id2 + 1) from t, are not rewritten. --- .../nereids/rules/rewrite/SumLiteralRewrite.java | 25 +++-- .../rules/rewrite/SumLiteralRewriteTest.java | 31 ++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewrite.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewrite.java index c99071a714e..dcc64ce2c1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewrite.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewrite.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; @@ -64,13 +65,33 @@ public class SumLiteralRewrite extends OneRewriteRuleFactory { } sumLiteralMap.put(pel.first, pel.second); } -if (sumLiteralMap.isEmpty()) { +Map> validSumLiteralMap = +removeOneSumLiteral(sumLiteralMap); +if (validSumLiteralMap.isEmpty()) { return null; } -return rewriteSumLiteral(agg, sumLiteralMap); +return rewriteSumLiteral(agg, validSumLiteralMap); }).toRule(RuleType.SUM_LITERAL_REWRITE); } +// when there only one sum literal like select count(id1 + 1), count(id2 + 1) from t, we don't rewrite them. +private Map> removeOneSumLiteral( +Map> sumLiteralMap) { +Map countSum = new HashMap<>(); +for (Entry> e : sumLiteralMap.entrySet()) { +Expression expr = e.getValue().first.expr; +countSum.merge(expr, 1, Integer::sum); +} +Map> validSumLiteralMap = new HashMap<>(); +for (Entry> e : sumLiteralMap.entrySet()) { +Expression expr = e.getValue().first.expr; +if (countSum.get(expr) > 1) { +validSumLiteralMap.put(e.getKey(), e.getValue()); +} +} +return validSumLiteralMap; +} + private Plan rewriteSumLiteral( LogicalAggregate agg, Map> sumLiteralMap) { Set newAggOutput = new HashSet<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewriteTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewriteTest.java index cb2cc77627e..19ea7b864fb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewriteTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SumLiteralRewriteTest.java @@ -112,4 +112,35 @@ class SumLiteralRewriteTest implements MemoPatternMatchSupported { .printlnTree() .matches(logicalAggregate().when(p -> p.getOutputs().size() == 4)); } + +@Test +void testSumOnce() { +Slot slot1 = scan1.getOutput().get(0); +Alias add1 = new Alias(new Sum(false, true, new Add(slot1, Literal.of(1; +LogicalAggregate agg = new LogicalAggregate<>( +ImmutableList.of(scan1.getOutput().get(0)), ImmutableList.of(add1), scan1); +PlanChecker.from(MemoTestUtils.createConnectContext(), agg) +.applyTopDown(ImmutableList.of(new SumLiteralRewrite().build())) +.printlnTree() +.matches(logicalAggregate().when(p -> p.getOutputs().size() == 1)); + +Slot slot2 = new Alias(scan1.getOutput().get(0)).toSlot(); +Alias add2 = new Alias(new Sum(false, true, new Add(slot2, Literal.of(2; +agg = new LogicalAggregate<>( +ImmutableList.of(scan1.getOutput().get(0)), ImmutableList.of(add1, add2), scan1); +PlanChecker.from(MemoTestUtils.createConnectContext(), agg) +.applyTopDown(Immutabl
(doris) branch master updated: [opt](arena) lazy memory allocation in arena (#36498)
This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 127987f307f [opt](arena) lazy memory allocation in arena (#36498) 127987f307f is described below commit 127987f307f2815bd65f0be002738431b68b4ec8 Author: zhiqiang AuthorDate: Mon Jul 1 14:57:38 2024 +0800 [opt](arena) lazy memory allocation in arena (#36498) Arena should not allocate memory in this constructor. After this pr merged, we shuold revert https://github.com/apache/doris/pull/36299 --- be/src/vec/common/arena.h | 59 +-- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/be/src/vec/common/arena.h b/be/src/vec/common/arena.h index 4ab3ee4c606..65e8c1dfabe 100644 --- a/be/src/vec/common/arena.h +++ b/be/src/vec/common/arena.h @@ -84,20 +84,22 @@ private: size_t used() const { return pos - begin; } }; -size_t growth_factor; -size_t linear_growth_threshold; +size_t growth_factor = 2; +size_t linear_growth_threshold = 128 * 1024 * 1024; /// Last contiguous chunk of memory. Chunk* head = nullptr; -size_t size_in_bytes; +size_t size_in_bytes = 0; +size_t _initial_size = 4096; // The memory used by all chunks, excluding head. -size_t _used_size_no_head; +size_t _used_size_no_head = 0; static size_t round_up_to_page_size(size_t s) { return (s + 4096 - 1) / 4096 * 4096; } /// If chunks size is less than 'linear_growth_threshold', then use exponential growth, otherwise - linear growth /// (to not allocate too much excessive memory). -size_t next_size(size_t min_next_size) const { +size_t next_size(size_t min_next_size) { +DCHECK(head != nullptr); size_t size_after_grow = 0; if (head->size() < linear_growth_threshold) { @@ -120,12 +122,20 @@ private: } /// Add next contiguous chunk of memory with size not less than specified. -void NO_INLINE add_chunk(size_t min_size) { +void NO_INLINE _add_chunk(size_t min_size) { +DCHECK(head != nullptr); _used_size_no_head += head->used(); head = new Chunk(next_size(min_size + pad_right), head); size_in_bytes += head->size(); } +void _init_head_if_needed() { +if (UNLIKELY(head == nullptr)) { +head = new Chunk(_initial_size, nullptr); +size_in_bytes += head->size(); +} +} + friend class ArenaAllocator; template friend class AlignedArenaAllocator; @@ -135,15 +145,18 @@ public: size_t linear_growth_threshold_ = 128 * 1024 * 1024) : growth_factor(growth_factor_), linear_growth_threshold(linear_growth_threshold_), - head(new Chunk(initial_size_, nullptr)), - size_in_bytes(head->size()), + _initial_size(initial_size_), _used_size_no_head(0) {} ~Arena() { delete head; } /// Get piece of memory, without alignment. char* alloc(size_t size) { -if (UNLIKELY(head->pos + size > head->end)) add_chunk(size); +_init_head_if_needed(); + +if (UNLIKELY(head->pos + size > head->end)) { +_add_chunk(size); +} char* res = head->pos; head->pos += size; @@ -153,6 +166,8 @@ public: /// Get piece of memory with alignment char* aligned_alloc(size_t size, size_t alignment) { +_init_head_if_needed(); + do { void* head_pos = head->pos; size_t space = head->end - head->pos; @@ -165,7 +180,7 @@ public: return res; } -add_chunk(size + alignment); +_add_chunk(size + alignment); } while (true); } @@ -180,6 +195,8 @@ public: * the allocation it intended to roll back was indeed the last one. */ void* rollback(size_t size) { +DCHECK(head != nullptr); + head->pos -= size; ASAN_POISON_MEMORY_REGION(head->pos, size + pad_right); return head->pos; @@ -208,6 +225,8 @@ public: return result; } +DCHECK(head != nullptr); + // Extend an existing memory range with 'additional_bytes'. // This method only works for extending the last allocation. For lack of @@ -291,6 +310,10 @@ public: * and only 128M can be reused when you apply for 4G memory again. */ void clear() { +if (head == nullptr) { +return; +} + if (head->prev) { delete head->prev; head->prev = nullptr; @@ -303,9 +326,21 @@ public: /// Size of chunks in bytes. size_t size() const { return size_in_bytes; } -size_t used_size() const { return _used_size_no_head + head->used(); } +si