This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new f2aa5f32b8 [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change (#9811) f2aa5f32b8 is described below commit f2aa5f32b8ab81308e8d27d25121a14e41961706 Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jun 7 15:04:57 2022 +0800 [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change (#9811) Some pre-refactorings or interface additions for schema change --- be/src/olap/column_mapping.h | 14 ++++--- be/src/olap/memtable.cpp | 14 ++----- be/src/olap/memtable.h | 10 ++--- be/src/olap/olap_common.h | 27 +++++++------ be/src/olap/rowset/alpha_rowset_writer.h | 9 ++--- be/src/olap/rowset/beta_rowset_reader.h | 5 +-- be/src/olap/rowset/beta_rowset_writer.h | 7 +--- be/src/olap/rowset/rowset.h | 17 ++++----- be/src/olap/rowset/rowset_writer.h | 8 +--- be/src/olap/rowset/rowset_writer_context.h | 28 ++++++++++++-- be/src/olap/storage_engine.cpp | 29 +++++--------- be/src/olap/storage_engine.h | 5 ++- be/src/olap/tablet.h | 3 +- be/src/olap/tablet_schema.cpp | 22 ++++++++++- be/src/olap/tablet_schema.h | 7 +++- be/src/vec/common/cow.h | 33 ++++++++-------- be/src/vec/common/string_ref.h | 2 + be/src/vec/exprs/vexpr_context.cpp | 12 ++++-- be/src/vec/exprs/vexpr_context.h | 4 +- be/src/vec/functions/function_hash.cpp | 2 +- be/src/vec/functions/function_ifnull.cpp | 2 +- be/src/vec/functions/functions_geo.cpp | 2 +- be/src/vec/functions/simple_function_factory.h | 12 +++--- be/src/vec/olap/block_reader.cpp | 24 +++--------- be/src/vec/olap/olap_data_convertor.cpp | 52 +++++--------------------- be/src/vec/olap/olap_data_convertor.h | 6 +++ 26 files changed, 173 insertions(+), 183 deletions(-) diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index 929a1b3980..de82705a91 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -15,16 +15,18 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H -#define DORIS_BE_SRC_OLAP_COLUMN_MAPPING_H +#pragma once +#include <gen_cpp/Exprs_types.h> + +#include <memory> namespace doris { class WrapperField; struct ColumnMapping { ColumnMapping() : ref_column(-1), default_value(nullptr) {} - virtual ~ColumnMapping() {} + virtual ~ColumnMapping() = default; // <0: use default value // >=0: use origin column @@ -33,9 +35,9 @@ struct ColumnMapping { WrapperField* default_value; // materialize view transform function used in schema change std::string materialized_function; + std::shared_ptr<TExpr> expr; }; -typedef std::vector<ColumnMapping> SchemaMapping; +using SchemaMapping = std::vector<ColumnMapping>; -} // namespace doris -#endif // DORIS_BE_SRC_COLUMN_MAPPING_H +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index e2fefd0149..c8c3d78e2a 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -91,17 +91,9 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescript void MemTable::_init_agg_functions(const vectorized::Block* block) { for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) { - FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation(); - std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + - vectorized::AGG_LOAD_SUFFIX; - std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), - [](unsigned char c) { return std::tolower(c); }); - - // create aggregate function - vectorized::DataTypes argument_types {block->get_data_type(cid)}; vectorized::AggregateFunctionPtr function = - vectorized::AggregateFunctionSimpleFactory::instance().get( - agg_name, argument_types, {}, argument_types.back()->is_nullable()); + _tablet_schema->column(cid).get_aggregate_function({block->get_data_type(cid)}, + vectorized::AGG_LOAD_SUFFIX); DCHECK(function != nullptr); _agg_functions[cid] = function; @@ -316,7 +308,7 @@ void MemTable::shrink_memtable_by_agg() { _collect_vskiplist_results<false>(); } -bool MemTable::is_flush() { +bool MemTable::is_flush() const { return memory_usage() >= config::write_buffer_size; } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index d8f16ba4c5..c0b4c92839 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -56,7 +56,7 @@ public: void shrink_memtable_by_agg(); - bool is_flush(); + bool is_flush() const; bool need_to_agg(); @@ -72,7 +72,7 @@ private: class RowCursorComparator : public RowComparator { public: RowCursorComparator(const Schema* schema); - int operator()(const char* left, const char* right) const; + int operator()(const char* left, const char* right) const override; private: const Schema* _schema; @@ -121,9 +121,9 @@ private: }; private: - typedef SkipList<char*, RowComparator> Table; - typedef Table::key_type TableKey; - typedef SkipList<RowInBlock*, RowInBlockComparator> VecTable; + using Table = SkipList<char*, RowComparator>; + using TableKey = Table::key_type; + using VecTable = SkipList<RowInBlock*, RowInBlockComparator>; public: /// The iterator of memtable, so that the data in this memtable diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index c8256d0f2e..304157e4e4 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -36,17 +36,16 @@ #include "util/hash_util.hpp" #include "util/uid_util.h" -#define LOW_56_BITS 0x00ffffffffffffff - namespace doris { -static const int64_t MAX_ROWSET_ID = 1L << 56; +static constexpr int64_t MAX_ROWSET_ID = 1L << 56; +static constexpr int64_t LOW_56_BITS = 0x00ffffffffffffff; -typedef int32_t SchemaHash; -typedef __int128 int128_t; -typedef unsigned __int128 uint128_t; +using SchemaHash = int32_t; +using int128_t = __int128; +using uint128_t = unsigned __int128; -typedef UniqueId TabletUid; +using TabletUid = UniqueId; enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2 }; @@ -200,6 +199,12 @@ struct Version { Version(int64_t first_, int64_t second_) : first(first_), second(second_) {} Version() : first(0), second(0) {} + static Version mock() { + // Every time SchemaChange is used for external rowing, some temporary versions (such as 999, 1000, 1001) will be written, in order to avoid Cache conflicts, temporary + // The version number takes a BIG NUMBER plus the version number of the current SchemaChange + return Version(1 << 28, 1 << 29); + } + friend std::ostream& operator<<(std::ostream& os, const Version& version); bool operator!=(const Version& rhs) const { return first != rhs.first || second != rhs.second; } @@ -211,7 +216,7 @@ struct Version { } }; -typedef std::vector<Version> Versions; +using Versions = std::vector<Version>; inline std::ostream& operator<<(std::ostream& os, const Version& version) { return os << "[" << version.first << "-" << version.second << "]"; @@ -305,11 +310,11 @@ struct OlapReaderStatistics { int64_t general_debug_ns[GENERAL_DEBUG_COUNT] = {}; }; -typedef uint32_t ColumnId; +using ColumnId = uint32_t; // Column unique id set -typedef std::set<uint32_t> UniqueIdSet; +using UniqueIdSet = std::set<uint32_t>; // Column unique Id -> column id map -typedef std::map<ColumnId, ColumnId> UniqueIdToColumnIdMap; +using UniqueIdToColumnIdMap = std::map<ColumnId, ColumnId>; // 8 bit rowset id version // 56 bit, inc number from 1 diff --git a/be/src/olap/rowset/alpha_rowset_writer.h b/be/src/olap/rowset/alpha_rowset_writer.h index a4f7dfa479..411beabf62 100644 --- a/be/src/olap/rowset/alpha_rowset_writer.h +++ b/be/src/olap/rowset/alpha_rowset_writer.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_WRITER_H -#define DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_WRITER_H +#pragma once #include <vector> @@ -31,7 +30,7 @@ enum WriterState { WRITER_CREATED, WRITER_INITED, WRITER_FLUSHED }; class AlphaRowsetWriter : public RowsetWriter { public: AlphaRowsetWriter(); - virtual ~AlphaRowsetWriter(); + ~AlphaRowsetWriter() override; Status init(const RowsetWriterContext& rowset_writer_context) override; @@ -51,7 +50,7 @@ public: Version version() override { return _rowset_writer_context.version; } - int64_t num_rows() override { return _num_rows_written; } + int64_t num_rows() const override { return _num_rows_written; } RowsetId rowset_id() override { return _rowset_writer_context.rowset_id; } @@ -84,5 +83,3 @@ private: }; } // namespace doris - -#endif // DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_WRITER_H diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 7ba7b6a945..885d16b8c7 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H -#define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H +#pragma once #include "olap/iterators.h" #include "olap/row_block.h" @@ -73,5 +72,3 @@ private: }; } // namespace doris - -#endif //DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 570a8f7650..69a957a3ce 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H -#define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H +#pragma once #include "olap/rowset/rowset_writer.h" #include "vector" @@ -63,7 +62,7 @@ public: Version version() override { return _context.version; } - int64_t num_rows() override { return _num_rows_written; } + int64_t num_rows() const override { return _num_rows_written; } RowsetId rowset_id() override { return _context.rowset_id; } @@ -103,5 +102,3 @@ private: }; } // namespace doris - -#endif //DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_WRITER_H diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 8ca1ee5551..f1fded38ba 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H -#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H +#pragma once #include <atomic> #include <memory> @@ -106,7 +105,7 @@ private: class Rowset : public std::enable_shared_from_this<Rowset> { public: - virtual ~Rowset() {} + virtual ~Rowset() = default; // Open all segment files in this rowset and load necessary metadata. // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now @@ -147,15 +146,15 @@ public: size_t num_rows() const { return rowset_meta()->num_rows(); } Version version() const { return rowset_meta()->version(); } RowsetId rowset_id() const { return rowset_meta()->rowset_id(); } - int64_t creation_time() { return rowset_meta()->creation_time(); } + int64_t creation_time() const { return rowset_meta()->creation_time(); } PUniqueId load_id() const { return rowset_meta()->load_id(); } int64_t txn_id() const { return rowset_meta()->txn_id(); } int64_t partition_id() const { return rowset_meta()->partition_id(); } // flag for push delete rowset bool delete_flag() const { return rowset_meta()->delete_flag(); } int64_t num_segments() const { return rowset_meta()->num_segments(); } - void to_rowset_pb(RowsetMetaPB* rs_meta) { return rowset_meta()->to_rowset_pb(rs_meta); } - const RowsetMetaPB& get_rowset_pb() { return rowset_meta()->get_rowset_pb(); } + void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); } + const RowsetMetaPB& get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); } KeysType keys_type() { return _schema->keys_type(); } // remove all files in this rowset @@ -219,7 +218,9 @@ public: void set_need_delete_file() { _need_delete_file = true; } - bool contains_version(Version version) { return rowset_meta()->version().contains(version); } + bool contains_version(Version version) const { + return rowset_meta()->version().contains(version); + } FilePathDesc rowset_path_desc() { return _rowset_path_desc; } @@ -290,5 +291,3 @@ protected: }; } // namespace doris - -#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_H diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 22239f4eaf..49824cac01 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H -#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H +#pragma once #include "gen_cpp/olap_file.pb.h" #include "gen_cpp/types.pb.h" @@ -30,7 +29,6 @@ namespace doris { struct ContiguousRow; class MemTable; -class RowCursor; class RowsetWriter { public: @@ -74,7 +72,7 @@ public: virtual Version version() = 0; - virtual int64_t num_rows() = 0; + virtual int64_t num_rows() const = 0; virtual RowsetId rowset_id() = 0; @@ -85,5 +83,3 @@ private: }; } // namespace doris - -#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_H diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index b93c4633d6..539845fe5f 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_CONTEXT_H -#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_CONTEXT_H +#pragma once #include "gen_cpp/olap_file.pb.h" #include "olap/data_dir.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" #include "olap/tablet_schema.h" namespace doris { @@ -42,6 +43,27 @@ struct RowsetWriterContext { load_id.set_hi(0); load_id.set_lo(0); } + + static RowsetWriterContext create(const Version& version, TabletSharedPtr new_tablet, + RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap) { + RowsetWriterContext context; + context.rowset_id = StorageEngine::instance()->next_rowset_id(); + context.tablet_uid = new_tablet->tablet_uid(); + context.tablet_id = new_tablet->tablet_id(); + context.partition_id = new_tablet->partition_id(); + context.tablet_schema_hash = new_tablet->schema_hash(); + context.rowset_type = new_rowset_type; + context.path_desc = new_tablet->tablet_path_desc(); + context.tablet_schema = &(new_tablet->tablet_schema()); + context.data_dir = new_tablet->data_dir(); + context.rowset_state = VISIBLE; + context.version = version; + context.segments_overlap = segments_overlap; + + return context; + } + RowsetId rowset_id; int64_t tablet_id; int64_t tablet_schema_hash; @@ -74,5 +96,3 @@ struct RowsetWriterContext { }; } // namespace doris - -#endif // DORIS_BE_SRC_OLAP_ROWSET_ROWSET_WRITER_CONTEXT_H diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 35dc3dd1e1..c872851352 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -248,7 +248,7 @@ Status StorageEngine::_init_store_map() { _store_map.emplace(store->path(), store); } - std::string stream_load_record_path = ""; + std::string stream_load_record_path; if (!tmp_stores.empty()) { stream_load_record_path = tmp_stores[0]->path(); } @@ -1080,7 +1080,8 @@ void StorageEngine::notify_listeners() { } Status StorageEngine::execute_task(EngineTask* task) { - { + auto lock_related_tablets = [&]() -> std::vector<std::unique_lock<std::shared_mutex>> { + // add write lock to all related tablets std::vector<TabletInfo> tablet_infos; task->get_related_tablets(&tablet_infos); sort(tablet_infos.begin(), tablet_infos.end()); @@ -1096,7 +1097,11 @@ Status StorageEngine::execute_task(EngineTask* task) { << tablet_info.tablet_id; } } - // add write lock to all related tablets + return wrlocks; + }; + + { + auto wrlocks = lock_related_tablets(); Status prepare_status = task->prepare(); if (prepare_status != Status::OK()) { return prepare_status; @@ -1110,23 +1115,7 @@ Status StorageEngine::execute_task(EngineTask* task) { } { - std::vector<TabletInfo> tablet_infos; - // related tablets may be changed after execute task, so that get them here again - task->get_related_tablets(&tablet_infos); - sort(tablet_infos.begin(), tablet_infos.end()); - std::vector<TabletSharedPtr> related_tablets; - std::vector<std::unique_lock<std::shared_mutex>> wrlocks; - for (TabletInfo& tablet_info : tablet_infos) { - TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_info.tablet_id); - if (tablet != nullptr) { - related_tablets.push_back(tablet); - wrlocks.push_back(std::unique_lock<std::shared_mutex>(tablet->get_header_lock())); - } else { - LOG(WARNING) << "could not get tablet before finish tabletid: " - << tablet_info.tablet_id; - } - } - // add write lock to all related tablets + auto wrlocks = lock_related_tablets(); Status fin_status = task->finish(); return fin_status; } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index ce33223c5a..df6e78874b 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -42,7 +42,6 @@ #include "olap/olap_meta.h" #include "olap/options.h" #include "olap/rowset/rowset_id_generator.h" -#include "olap/tablet.h" #include "olap/tablet_manager.h" #include "olap/task/engine_task.h" #include "olap/txn_manager.h" @@ -104,7 +103,9 @@ public: std::vector<DataDir*> get_stores_for_create_tablet(TStorageMedium::type storage_medium); DataDir* get_store(const std::string& path); - uint32_t available_storage_medium_type_count() { return _available_storage_medium_type_count; } + uint32_t available_storage_medium_type_count() const { + return _available_storage_medium_type_count; + } Status set_cluster_id(int32_t cluster_id); int32_t effective_cluster_id() const { return _effective_cluster_id; } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a7f1830ef1..80ebfed636 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -33,7 +33,6 @@ #include "olap/olap_define.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_reader.h" -#include "olap/rowset/rowset_writer.h" #include "olap/tablet_meta.h" #include "olap/tuple.h" #include "olap/utils.h" @@ -48,6 +47,8 @@ class TabletMeta; class CumulativeCompactionPolicy; class CumulativeCompaction; class BaseCompaction; +class RowsetWriter; +struct RowsetWriterContext; using TabletSharedPtr = std::shared_ptr<Tablet>; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 6e9703f569..683dff9d62 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -18,8 +18,9 @@ #include "olap/tablet_schema.h" #include "tablet_meta.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/core/block.h" -#include "vec/data_types/data_type.h" #include "vec/data_types/data_type_factory.hpp" namespace doris { @@ -405,6 +406,16 @@ void TabletColumn::add_sub_column(TabletColumn& sub_column) { _sub_column_count += 1; } +vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function( + vectorized::DataTypes argument_types, std::string suffix) const { + std::string agg_name = TabletColumn::get_string_by_aggregation_type(_aggregation) + suffix; + std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), + [](unsigned char c) { return std::tolower(c); }); + + return vectorized::AggregateFunctionSimpleFactory::instance().get( + agg_name, argument_types, {}, argument_types.back()->is_nullable()); +} + void TabletSchema::init_from_pb(const TabletSchemaPB& schema) { _keys_type = schema.keys_type(); _num_columns = 0; @@ -525,6 +536,15 @@ vectorized::Block TabletSchema::create_block( return block; } +vectorized::Block TabletSchema::create_block() const { + vectorized::Block block; + for (const auto& col : _cols) { + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(col); + block.insert({data_type->create_column(), data_type, col.name()}); + } + return block; +} + bool operator==(const TabletColumn& a, const TabletColumn& b) { if (a._unique_id != b._unique_id) return false; if (a._col_name != b._col_name) return false; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 7c3209ee82..f0a5b72157 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -23,6 +23,8 @@ #include "gen_cpp/segment_v2.pb.h" #include "olap/olap_define.h" #include "olap/types.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/data_types/data_type.h" namespace doris { namespace vectorized { @@ -62,9 +64,11 @@ public: size_t index_length() const { return _index_length; } void set_index_length(size_t index_length) { _index_length = index_length; } FieldAggregationMethod aggregation() const { return _aggregation; } + vectorized::AggregateFunctionPtr get_aggregate_function(vectorized::DataTypes argument_types, + std::string suffix) const; int precision() const { return _precision; } int frac() const { return _frac; } - bool visible() { return _visible; } + bool visible() const { return _visible; } // Add a sub column. void add_sub_column(TabletColumn& sub_column); @@ -151,6 +155,7 @@ public: vectorized::Block create_block( const std::vector<uint32_t>& return_columns, const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const; + vectorized::Block create_block() const; private: // Only for unit test. diff --git a/be/src/vec/common/cow.h b/be/src/vec/common/cow.h index 27f3e864a6..1bab30ddc2 100644 --- a/be/src/vec/common/cow.h +++ b/be/src/vec/common/cow.h @@ -20,31 +20,25 @@ #pragma once -#include <boost/smart_ptr/intrusive_ptr.hpp> -#include <boost/smart_ptr/intrusive_ref_counter.hpp> +#include <atomic> #include <initializer_list> /** Copy-on-write shared ptr. * Allows to work with shared immutable objects and sometimes unshare and mutate you own unique copy. * * Usage: - class Column : public COW<Column> { private: friend class COW<Column>; - /// Leave all constructors in private section. They will be avaliable through 'create' method. Column(); - /// Provide 'clone' method. It can be virtual if you want polymorphic behaviour. virtual Column * clone() const; public: /// Correctly use const qualifiers in your interface. - virtual ~Column() {} }; - * It will provide 'create' and 'mutate' methods. * And 'Ptr' and 'MutablePtr' types. * Ptr is refcounted pointer to immutable object. @@ -63,9 +57,7 @@ Column::Ptr x = Column::create(1); /// Sharing single immutable object in two ptrs. Column::Ptr y = x; - /// Now x and y are shared. - /// Change value of x. { /// Creating mutable ptr. It can clone an object under the hood if it was shared. @@ -75,9 +67,7 @@ /// Assigning pointer 'x' to mutated object. x = std::move(mutate_x); } - /// Now x and y are unshared and have different values. - * Note. You may have heard that COW is bad practice. * Actually it is, if your values are small or if copying is done implicitly. * This is the case for string implementations. @@ -120,20 +110,28 @@ protected: intrusive_ptr() : t(nullptr) {} intrusive_ptr(T* t, bool add_ref = true) : t(t) { - if (t && add_ref) ((std::remove_const_t<T>*)t)->add_ref(); + if (t && add_ref) { + ((std::remove_const_t<T>*)t)->add_ref(); + } } template <typename U> intrusive_ptr(intrusive_ptr<U> const& rhs) : t(rhs.get()) { - if (t) ((std::remove_const_t<T>*)t)->add_ref(); + if (t) { + ((std::remove_const_t<T>*)t)->add_ref(); + } } intrusive_ptr(intrusive_ptr const& rhs) : t(rhs.get()) { - if (t) ((std::remove_const_t<T>*)t)->add_ref(); + if (t) { + ((std::remove_const_t<T>*)t)->add_ref(); + } } ~intrusive_ptr() { - if (t) ((std::remove_const_t<T>*)t)->release_ref(); + if (t) { + ((std::remove_const_t<T>*)t)->release_ref(); + } } template <typename U> @@ -313,10 +311,11 @@ public: protected: MutablePtr shallow_mutate() const { - if (this->use_count() > 1) + if (this->use_count() > 1) { return derived()->clone(); - else + } else { return assume_mutable(); + } } public: diff --git a/be/src/vec/common/string_ref.h b/be/src/vec/common/string_ref.h index 8f83d072ad..fc8fe5187b 100644 --- a/be/src/vec/common/string_ref.h +++ b/be/src/vec/common/string_ref.h @@ -29,6 +29,7 @@ #include "gutil/hash/city.h" #include "gutil/hash/hash128to64.h" #include "udf/udf.h" +#include "util/slice.h" #include "vec/common/unaligned.h" #include "vec/core/types.h" @@ -54,6 +55,7 @@ struct StringRef { std::string to_string() const { return std::string(data, size); } std::string_view to_string_view() const { return std::string_view(data, size); } + doris::Slice to_slice() const { return doris::Slice(data, size); } // this is just for show, eg. print data to error log, to avoid print large string. std::string to_prefix(size_t length) const { return std::string(data, std::min(length, size)); } diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index ea3300eaf3..7da3d86d1e 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -48,7 +48,11 @@ doris::Status VExprContext::prepare(doris::RuntimeState* state, const doris::RowDescriptor& row_desc, const std::shared_ptr<doris::MemTracker>& tracker) { _prepared = true; - _mem_tracker = tracker; + if (!tracker) { + _mem_tracker = tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); + } else { + _mem_tracker = tracker; + } SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); _pool.reset(new MemPool(_mem_tracker.get())); return _root->prepare(state, row_desc, this); @@ -78,7 +82,7 @@ void VExprContext::close(doris::RuntimeState* state) { _fn_contexts[i]->impl()->close(); } // _pool can be NULL if Prepare() was never called - if (_pool != NULL) { + if (_pool != nullptr) { _pool->free_all(); } _closed = true; @@ -140,7 +144,9 @@ Block VExprContext::get_output_block_after_execute_exprs( for (auto vexpr_ctx : output_vexpr_ctxs) { int result_column_id = -1; status = vexpr_ctx->execute(&tmp_block, &result_column_id); - if (UNLIKELY(!status.ok())) return {}; + if (UNLIKELY(!status)) { + return {}; + } DCHECK(result_column_id != -1); result_columns.emplace_back(tmp_block.get_by_position(result_column_id)); } diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 86a9d086a6..bfa4468916 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -29,7 +29,7 @@ public: VExprContext(VExpr* expr); ~VExprContext(); Status prepare(RuntimeState* state, const RowDescriptor& row_desc, - const std::shared_ptr<MemTracker>& tracker); + const std::shared_ptr<MemTracker>& tracker = nullptr); Status open(RuntimeState* state); void close(RuntimeState* state); Status clone(RuntimeState* state, VExprContext** new_ctx); @@ -61,7 +61,7 @@ public: static Block get_output_block_after_execute_exprs(const std::vector<vectorized::VExprContext*>&, const Block&, Status&); - int get_last_result_column_id() { + int get_last_result_column_id() const { DCHECK(_last_result_column_id != -1); return _last_result_column_id; } diff --git a/be/src/vec/functions/function_hash.cpp b/be/src/vec/functions/function_hash.cpp index 92e2a55827..4a6235c9a6 100644 --- a/be/src/vec/functions/function_hash.cpp +++ b/be/src/vec/functions/function_hash.cpp @@ -223,7 +223,7 @@ struct MurmurHash3Impl32 { }; using FunctionMurmurHash3_32 = FunctionVariadicArgumentsBase<DataTypeInt32, MurmurHash3Impl32>; -void register_function_function_hash(SimpleFunctionFactory& factory) { +void register_function_hash(SimpleFunctionFactory& factory) { factory.register_function<FunctionMurmurHash2_64>(); factory.register_function<FunctionMurmurHash3_32>(); } diff --git a/be/src/vec/functions/function_ifnull.cpp b/be/src/vec/functions/function_ifnull.cpp index bbd66e51e4..1f9c9d48dc 100644 --- a/be/src/vec/functions/function_ifnull.cpp +++ b/be/src/vec/functions/function_ifnull.cpp @@ -21,7 +21,7 @@ #include "function_ifnull.h" namespace doris::vectorized { -void register_function_function_ifnull(SimpleFunctionFactory& factory) { +void register_function_ifnull(SimpleFunctionFactory& factory) { factory.register_function<FunctionIfNull>(); factory.register_alias(FunctionIfNull::name, "nvl"); } diff --git a/be/src/vec/functions/functions_geo.cpp b/be/src/vec/functions/functions_geo.cpp index 71018a9b67..534bc1300e 100644 --- a/be/src/vec/functions/functions_geo.cpp +++ b/be/src/vec/functions/functions_geo.cpp @@ -517,7 +517,7 @@ struct StGeoFromText { } }; -void register_geo_functions(SimpleFunctionFactory& factory) { +void register_function_geo(SimpleFunctionFactory& factory) { factory.register_function<GeoFunction<StPoint>>(); factory.register_function<GeoFunction<StAsText<StAsWktName>>>(); factory.register_function<GeoFunction<StAsText<StAsTextName>>>(); diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 986528f25f..7e56dbac99 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -63,8 +63,8 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory); void register_function_timestamp(SimpleFunctionFactory& factory); void register_function_utility(SimpleFunctionFactory& factory); void register_function_json(SimpleFunctionFactory& factory); -void register_function_function_hash(SimpleFunctionFactory& factory); -void register_function_function_ifnull(SimpleFunctionFactory& factory); +void register_function_hash(SimpleFunctionFactory& factory); +void register_function_ifnull(SimpleFunctionFactory& factory); void register_function_like(SimpleFunctionFactory& factory); void register_function_regexp(SimpleFunctionFactory& factory); void register_function_random(SimpleFunctionFactory& factory); @@ -75,7 +75,7 @@ void register_function_convert_tz(SimpleFunctionFactory& factory); void register_function_least_greast(SimpleFunctionFactory& factory); void register_function_fake(SimpleFunctionFactory& factory); void register_function_array(SimpleFunctionFactory& factory); -void register_geo_functions(SimpleFunctionFactory& factory); +void register_function_geo(SimpleFunctionFactory& factory); void register_function_encryption(SimpleFunctionFactory& factory); void register_function_regexp_extract(SimpleFunctionFactory& factory); @@ -194,8 +194,8 @@ public: register_function_date_time_to_string(instance); register_function_date_time_string_to_string(instance); register_function_json(instance); - register_function_function_hash(instance); - register_function_function_ifnull(instance); + register_function_hash(instance); + register_function_ifnull(instance); register_function_comparison_eq_for_null(instance); register_function_like(instance); register_function_regexp(instance); @@ -210,7 +210,7 @@ public: register_function_regexp_extract(instance); register_function_hex_variadic(instance); register_function_array(instance); - register_geo_functions(instance); + register_function_geo(instance); }); return instance; } diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 62e583849f..aeeb84c679 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -23,16 +23,15 @@ #include "olap/storage_engine.h" #include "runtime/mem_pool.h" #include "runtime/mem_tracker.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" #include "vec/olap/vcollect_iterator.h" namespace doris::vectorized { BlockReader::~BlockReader() { for (int i = 0; i < _agg_functions.size(); ++i) { - AggregateFunctionPtr function = _agg_functions[i]; - AggregateDataPtr place = _agg_places[i]; - function->destroy(place); - delete[] place; + _agg_functions[i]->destroy(_agg_places[i]); + delete[] _agg_places[i]; } } @@ -85,22 +84,11 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) { auto& tablet_schema = tablet()->tablet_schema(); for (auto idx : _agg_columns_idx) { - FieldAggregationMethod agg_method = + AggregateFunctionPtr function = tablet_schema .column(read_params.origin_return_columns->at(_return_columns_loc[idx])) - .aggregation(); - std::string agg_name = - TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_READER_SUFFIX; - std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), - [](unsigned char c) { return std::tolower(c); }); - - // create aggregate function - DataTypes argument_types; - argument_types.push_back(_next_row.block->get_data_type(idx)); - Array params; - AggregateFunctionPtr function = AggregateFunctionSimpleFactory::instance().get( - agg_name, argument_types, params, - _next_row.block->get_data_type(idx)->is_nullable()); + .get_aggregate_function({_next_row.block->get_data_type(idx)}, + vectorized::AGG_READER_SUFFIX); DCHECK(function != nullptr); _agg_functions.push_back(function); // create aggregate data diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index 5da9641fcb..604d2a02ec 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -123,7 +123,9 @@ std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_colu // class OlapBlockDataConvertor::OlapColumnDataConvertorBase void OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column( const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) { - assert(num_rows > 0 && row_pos + num_rows <= typed_column.column->size()); + DCHECK(row_pos + num_rows <= typed_column.column->size()) + << "row_pos=" << row_pos << ", num_rows=" << num_rows + << ", typed_column.column->size()=" << typed_column.column->size(); _typed_column = typed_column; _row_pos = row_pos; _num_rows = num_rows; @@ -358,55 +360,21 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorChar::convert_to_olap() { column_string = assert_cast<const vectorized::ColumnString*>(_typed_column.column.get()); } - assert(column_string); - // If column_string is not padded to full, we should do padding here. if (should_padding(column_string, _length)) { _column = clone_and_padding(column_string, _length); column_string = assert_cast<const vectorized::ColumnString*>(_column.get()); } - const ColumnString::Char* char_data = column_string->get_chars().data(); - const ColumnString::Offset* offset_cur = column_string->get_offsets().data() + _row_pos; - const ColumnString::Offset* offset_end = offset_cur + _num_rows; - Slice* slice = _slice.data(); - size_t string_length; - size_t string_offset = *(offset_cur - 1); - [[maybe_unused]] size_t slice_size = _length; - if (_nullmap) { - const UInt8* nullmap_cur = _nullmap + _row_pos; - while (offset_cur != offset_end) { - if (!*nullmap_cur) { - string_length = *offset_cur - string_offset - 1; - assert(string_length <= slice_size); - slice->data = (char*)char_data + string_offset; - slice->size = string_length; - } else { - // TODO: this may not be necessary, check and remove later - slice->data = nullptr; - slice->size = 0; - } - - string_offset = *offset_cur; - ++nullmap_cur; - ++slice; - ++offset_cur; + for (size_t i = 0; i < _num_rows; i++) { + if (!_nullmap || !_nullmap[i + _row_pos]) { + _slice[i] = column_string->get_data_at(i + _row_pos).to_slice(); + DCHECK(_slice[i].size == _length) + << "char type data length not equal to schema, schema=" << _length + << ", real=" << _slice[i].size; } - assert(nullmap_cur == _nullmap + _row_pos + _num_rows && slice == _slice.get_end_ptr()); - } else { - while (offset_cur != offset_end) { - string_length = *offset_cur - string_offset - 1; - assert(string_length <= slice_size); - - slice->data = (char*)char_data + string_offset; - slice->size = string_length; - - string_offset = *offset_cur; - ++slice; - ++offset_cur; - } - assert(slice == _slice.get_end_ptr()); } + return Status::OK(); } diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index d0f5d01525..e11fb672b2 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -50,6 +50,7 @@ private: class OlapColumnDataConvertorBase : public IOlapColumnDataAccessor { public: OlapColumnDataConvertorBase() = default; + ~OlapColumnDataConvertorBase() override = default; OlapColumnDataConvertorBase(const OlapColumnDataConvertorBase&) = delete; OlapColumnDataConvertorBase& operator=(const OlapColumnDataConvertorBase&) = delete; OlapColumnDataConvertorBase(OlapColumnDataConvertorBase&&) = delete; @@ -120,6 +121,11 @@ private: column->offsets[i] = (i + 1) * (padding_length + 1); auto str = input->get_data_at(i); + + DCHECK(str.size <= padding_length) + << "char type data length over limit, padding_length=" << padding_length + << ", real=" << str.size; + if (str.size) { memcpy(padded_column->chars.data() + i * (padding_length + 1), str.data, str.size); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org