This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 7524c5e [Memory Engine] Add MemSubTablet, MemTablet, WriteTx, PartialRowBatch (#3637) 7524c5e is described below commit 7524c5ef63becb184583dae4111a19bcb0b43e22 Author: Binglin Chang <decst...@gmail.com> AuthorDate: Sat May 30 10:33:10 2020 +0800 [Memory Engine] Add MemSubTablet, MemTablet, WriteTx, PartialRowBatch (#3637) --- be/src/olap/base_tablet.cpp | 13 +- be/src/olap/base_tablet.h | 14 +- be/src/olap/memory/CMakeLists.txt | 3 + be/src/olap/memory/common.h | 1 + be/src/olap/memory/mem_sub_tablet.cpp | 246 ++++++++++++++++++ be/src/olap/memory/mem_sub_tablet.h | 120 +++++++++ be/src/olap/memory/mem_tablet.cpp | 35 ++- be/src/olap/memory/mem_tablet.h | 47 +++- be/src/olap/memory/partial_row_batch.cpp | 274 +++++++++++++++++++++ be/src/olap/memory/partial_row_batch.h | 172 +++++++++++++ be/src/olap/memory/schema.cpp | 72 ++++++ be/src/olap/memory/schema.h | 38 ++- .../olap/memory/{mem_tablet.cpp => write_txn.cpp} | 16 +- be/src/olap/memory/{mem_tablet.h => write_txn.h} | 35 ++- be/src/olap/tablet.cpp | 9 - be/src/olap/tablet.h | 3 +- be/src/util/time.h | 4 + be/test/olap/CMakeLists.txt | 1 + be/test/olap/memory/partial_row_batch_test.cpp | 111 +++++++++ be/test/olap/memory/schema_test.cpp | 22 +- 20 files changed, 1199 insertions(+), 37 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 41aa93a..368f5ed 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -#include "base_tablet.h" +#include "olap/base_tablet.h" +#include "util/path_util.h" +#include "olap/data_dir.h" namespace doris { @@ -24,6 +26,7 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : _tablet_meta(tablet_meta), _schema(tablet_meta->tablet_schema()), _data_dir(data_dir) { + _gen_tablet_path(); } BaseTablet::~BaseTablet() { @@ -40,4 +43,12 @@ OLAPStatus BaseTablet::set_tablet_state(TabletState state) { return OLAP_SUCCESS; } +void BaseTablet::_gen_tablet_path() { + std::string path = _data_dir->path() + DATA_PREFIX; + path = path_util::join_path_segments(path, std::to_string(_tablet_meta->shard_id())); + path = path_util::join_path_segments(path, std::to_string(_tablet_meta->tablet_id())); + path = path_util::join_path_segments(path, std::to_string(_tablet_meta->schema_hash())); + _tablet_path = path; +} + } /* namespace doris */ diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 34020eb..f3b0c2d 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -19,12 +19,15 @@ #define DORIS_BE_SRC_OLAP_BASE_TABLET_H #include <memory> + #include "olap/olap_define.h" #include "olap/tablet_meta.h" #include "olap/utils.h" namespace doris { +class DataDir; + // Base class for all tablet classes, currently only olap/Tablet and // olap/memory/MemTablet. // The fields and methods in this class is not final, it will change as memory @@ -57,10 +60,13 @@ public: inline void set_creation_time(int64_t creation_time); inline bool equal(int64_t tablet_id, int32_t schema_hash); - // propreties encapsulated in TabletSchema + // properties encapsulated in TabletSchema inline const TabletSchema& tablet_schema() const; protected: + void _gen_tablet_path(); + +protected: TabletState _state; TabletMetaSharedPtr _tablet_meta; TabletSchema _schema; @@ -72,7 +78,6 @@ private: DISALLOW_COPY_AND_ASSIGN(BaseTablet); }; - inline DataDir* BaseTablet::data_dir() const { return _data_dir; } @@ -99,9 +104,8 @@ inline int64_t BaseTablet::table_id() const { inline const std::string BaseTablet::full_name() const { std::stringstream ss; - ss << _tablet_meta->tablet_id() - << "." << _tablet_meta->schema_hash() - << "." << _tablet_meta->tablet_uid().to_string(); + ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "." + << _tablet_meta->tablet_uid().to_string(); return ss.str(); } diff --git a/be/src/olap/memory/CMakeLists.txt b/be/src/olap/memory/CMakeLists.txt index 9de9095..b552dfe 100644 --- a/be/src/olap/memory/CMakeLists.txt +++ b/be/src/olap/memory/CMakeLists.txt @@ -29,5 +29,8 @@ add_library(Memory STATIC delta_index.cpp hash_index.cpp mem_tablet.cpp + mem_sub_tablet.cpp + partial_row_batch.cpp schema.cpp + write_txn.cpp ) diff --git a/be/src/olap/memory/common.h b/be/src/olap/memory/common.h index ab952b9..2185dd2 100644 --- a/be/src/olap/memory/common.h +++ b/be/src/olap/memory/common.h @@ -26,6 +26,7 @@ #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/types.h" +#include "util/time.h" namespace doris { namespace memory { diff --git a/be/src/olap/memory/mem_sub_tablet.cpp b/be/src/olap/memory/mem_sub_tablet.cpp new file mode 100644 index 0000000..8bbdf5f --- /dev/null +++ b/be/src/olap/memory/mem_sub_tablet.cpp @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/mem_sub_tablet.h" + +#include "olap/memory/column.h" +#include "olap/memory/column_reader.h" +#include "olap/memory/column_writer.h" +#include "olap/memory/hash_index.h" +#include "olap/memory/partial_row_batch.h" +#include "olap/memory/schema.h" + +namespace doris { +namespace memory { + +Status MemSubTablet::create(uint64_t version, const Schema& schema, + std::unique_ptr<MemSubTablet>* ret) { + std::unique_ptr<MemSubTablet> tmp(new MemSubTablet()); + tmp->_versions.reserve(64); + tmp->_versions.emplace_back(version, 0); + tmp->_columns.resize(schema.cid_size()); + for (size_t i = 0; i < schema.num_columns(); i++) { + // TODO: support storage_type != c.type + auto& c = *schema.get(i); + if (!supported(c.type())) { + return Status::NotSupported("column type not supported"); + } + tmp->_columns[c.cid()].reset(new Column(c, c.type(), version)); + } + tmp.swap(*ret); + return Status::OK(); +} + +MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {} + +MemSubTablet::~MemSubTablet() {} + +Status MemSubTablet::get_size(uint64_t version, size_t* size) const { + std::lock_guard<std::mutex> lg(_lock); + if (version == static_cast<uint64_t>(-1)) { + // get latest + *size = _versions.back().size; + return Status::OK(); + } + if (_versions[0].version > version) { + return Status::NotFound("get_size failed, version too old"); + } + for (size_t i = 1; i < _versions.size(); i++) { + if (_versions[i].version > version) { + *size = _versions[i - 1].size; + return Status::OK(); + } + } + *size = _versions.back().size; + return Status::OK(); +} + +Status MemSubTablet::read_column(uint64_t version, uint32_t cid, + std::unique_ptr<ColumnReader>* reader) { + scoped_refptr<Column> cl; + { + std::lock_guard<std::mutex> lg(_lock); + if (cid < _columns.size()) { + cl = _columns[cid]; + } + } + if (cl.get() != nullptr) { + return Status::NotFound("column not found"); + } + return cl->create_reader(version, reader); +} + +Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) { + *index = _index; + return Status::OK(); +} + +Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) { + if (_schema != nullptr) { + return Status::InternalError("Another write is in-progress or error occurred"); + } + _schema = *schema; + _row_size = latest_size(); + _write_index = _index; + _writers.clear(); + _writers.resize(_columns.size()); + // precache key columns + for (size_t i = 0; i < _schema->num_key_columns(); i++) { + uint32_t cid = _schema->get(i)->cid(); + if (_writers[cid] != nullptr) { + RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid])); + } + } + _temp_hash_entries.reserve(8); + + // setup stats + _write_start = GetMonoTimeSecondsAsDouble(); + _num_insert = 0; + _num_update = 0; + _num_update_cell = 0; + return Status::OK(); +} + +Status MemSubTablet::apply_partial_row_batch(PartialRowBatch* batch) { + while (true) { + bool has_row = false; + RETURN_IF_ERROR(batch->next_row(&has_row)); + if (!has_row) { + break; + } + DCHECK_GE(batch->cur_row_cell_size(), 1); + const ColumnSchema* dsc; + const void* key; + // get key column and find in hash index + // TODO: support multi-column row key + batch->cur_row_get_cell(0, &dsc, &key); + ColumnWriter* keyw = _writers[1].get(); + // find candidate rowids, and check equality + uint64_t hashcode = keyw->hashcode(key, 0); + _temp_hash_entries.clear(); + uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries); + uint32_t rid = -1; + for (size_t i = 0; i < _temp_hash_entries.size(); i++) { + uint32_t test_rid = _temp_hash_entries[i]; + if (keyw->equals(test_rid, key, 0)) { + rid = test_rid; + break; + } + } + // if rowkey not found, do insertion/append + if (rid == -1) { + rid = _row_size; + // add all columns + //DLOG(INFO) << StringPrintf"insert rid=%u", rid); + for (size_t i = 0; i < batch->cur_row_cell_size(); i++) { + const void* data; + RETURN_IF_ERROR(batch->cur_row_get_cell(i, &dsc, &data)); + uint32_t cid = dsc->cid(); + if (_writers[cid] == nullptr) { + RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid])); + } + RETURN_IF_ERROR(_writers[cid]->insert(rid, data)); + } + _write_index->set(newslot, hashcode, rid); + _row_size++; + if (_write_index->need_rebuild()) { + scoped_refptr<HashIndex> new_index; + // TODO: trace memory usage + size_t new_capacity = _row_size * 2; + while (true) { + new_index = rebuild_hash_index(new_capacity); + if (new_index.get() != nullptr) { + break; + } else { + new_capacity += 1 << 16; + } + } + _write_index = new_index; + } + _num_insert++; + } else { + // rowkey found, do update + // add non-key columns + for (size_t i = 1; i < batch->cur_row_cell_size(); i++) { + const void* data; + RETURN_IF_ERROR(batch->cur_row_get_cell(i, &dsc, &data)); + uint32_t cid = dsc->cid(); + if (cid > _schema->num_key_columns()) { + if (_writers[cid] == nullptr) { + RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid])); + } + RETURN_IF_ERROR(_writers[cid]->update(rid, data)); + } + } + _num_update++; + _num_update_cell += batch->cur_row_cell_size() - 1; + } + } + return Status::OK(); +} + +Status MemSubTablet::commit_write(uint64_t version) { + for (size_t cid = 0; cid < _writers.size(); cid++) { + if (_writers[cid] != nullptr) { + // Should not fail in normal cases, fatal error if commit failed + RETURN_IF_ERROR(_writers[cid]->finalize(version)); + } + } + { + std::lock_guard<std::mutex> lg(_lock); + if (_index != _write_index) { + _index = _write_index; + } + for (size_t cid = 0; cid < _writers.size(); cid++) { + if (_writers[cid] != nullptr) { + // Should not fail in normal cases, fatal error if commit failed + RETURN_IF_ERROR(_writers[cid]->get_new_column(&_columns[cid])); + } + } + _versions.emplace_back(version, _row_size); + } + _write_index.reset(); + _writers.clear(); + _schema = nullptr; + LOG(INFO) << StringPrintf("commit writetxn(insert=%zu update=%zu update_cell=%zu) %.3lfs", + _num_insert, _num_update, _num_update_cell, + GetMonoTimeSecondsAsDouble() - _write_start); + return Status::OK(); +} + +scoped_refptr<HashIndex> MemSubTablet::rebuild_hash_index(size_t new_capacity) { + double t0 = GetMonoTimeSecondsAsDouble(); + ColumnWriter* keyw = _writers[1].get(); + scoped_refptr<HashIndex> hi(new HashIndex(new_capacity)); + for (size_t i = 0; i < _row_size; i++) { + const void* data = keyw->get(i); + DCHECK(data); + uint64_t hashcode = keyw->hashcode(data, 0); + if (!hi->add(hashcode, i)) { + double t1 = GetMonoTimeSecondsAsDouble(); + LOG(INFO) << StringPrintf("Rebuild hash index %zu failed time: %.3lfs, expand", + new_capacity, t1 - t0); + return scoped_refptr<HashIndex>(); + } + } + double t1 = GetMonoTimeSecondsAsDouble(); + LOG(INFO) << StringPrintf("Rebuild hash index %zu time: %.3lfs", new_capacity, t1 - t0); + return hi; +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/mem_sub_tablet.h b/be/src/olap/memory/mem_sub_tablet.h new file mode 100644 index 0000000..68ddc3d --- /dev/null +++ b/be/src/olap/memory/mem_sub_tablet.h @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/common.h" +#include "olap/memory/schema.h" + +namespace doris { +namespace memory { + +class HashIndex; +class ColumnReader; +class PartialRowBatch; +class Column; +class ColumnWriter; + +// A MemTablet can contain multiple MemSubTablets (currently only one). +// MemSubTablet hold a HashIndex and a collection of columns. +// It supports single-writer multi-reader concurrently. +// +// Example read usage: +// std::unique_ptr<ColumnReader> reader; +// sub_tablet->read_column(version, cid, &reader); +// // read(scan/get) cells using reader +// +// Example write usage: +// WrintTxn* wtxn; +// MemSubTablet* sub_tablet; +// sub_tablet->begin_write(current_schema); +// for (size_t i = 0; i < wtxn->batch_size(); i++) { +// auto batch = wtxn->get_batch(i); +// PartialRowReader reader(*batch); +// for (size_t j = 0; j < reader.size(); j++) { +// RETURN_IF_ERROR(reader.read(j)); +// RETURN_IF_ERROR(_sub_tablet->apply_partial_row(reader)); +// } +// } +// sub_tablet->commit_write(version); +class MemSubTablet { +public: + // Create a new empty MemSubTablet, with specified schema and initial version + static Status create(uint64_t version, const Schema& schema, + std::unique_ptr<MemSubTablet>* ret); + + // Destructor + ~MemSubTablet(); + + // Return number of rows of the latest version, including rows marked as delete + size_t latest_size() const { return _versions.back().size; } + + // Return number of rows of the specified version, including rows marked as delete + Status get_size(uint64_t version, size_t* size) const; + + // Read a column with specified by column id(cid) and version, return a column reader + Status read_column(uint64_t version, uint32_t cid, std::unique_ptr<ColumnReader>* reader); + + // Get a hash index read reference to read + Status get_index_to_read(scoped_refptr<HashIndex>* index); + + // Start a exclusive write batch + // Note: caller should make sure schema is valid during write + Status begin_write(scoped_refptr<Schema>* schema); + + // Apply a partial row to this MemSubTablet + Status apply_partial_row_batch(PartialRowBatch* batch); + + // Finalize the whole write batch, with specified version + Status commit_write(uint64_t version); + +private: + MemSubTablet(); + scoped_refptr<HashIndex> rebuild_hash_index(size_t new_capacity); + + mutable std::mutex _lock; + scoped_refptr<HashIndex> _index; + struct VersionInfo { + VersionInfo(uint64_t version, uint64_t size) : version(version), size(size) {} + uint64_t version = 0; + uint64_t size = 0; + }; + std::vector<VersionInfo> _versions; + // Map from cid to column + std::vector<scoped_refptr<Column>> _columns; + + // Temporary write state variables + scoped_refptr<Schema> _schema; + size_t _row_size = 0; + // If a copy-on-write is performed on HashIndex, this variable holds + // the new reference, otherwise it holds the same reference as _index + scoped_refptr<HashIndex> _write_index; + // Map from cid to current writers + std::vector<std::unique_ptr<ColumnWriter>> _writers; + // Temporary variable to reuse hash entry vector + std::vector<uint32_t> _temp_hash_entries; + // Write stats + double _write_start = 0; + size_t _num_insert = 0; + size_t _num_update = 0; + size_t _num_update_cell = 0; + + DISALLOW_COPY_AND_ASSIGN(MemSubTablet); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/mem_tablet.cpp b/be/src/olap/memory/mem_tablet.cpp index 5c2edd8..a75e30b 100644 --- a/be/src/olap/memory/mem_tablet.cpp +++ b/be/src/olap/memory/mem_tablet.cpp @@ -17,13 +17,46 @@ #include "olap/memory/mem_tablet.h" +#include "olap/memory/mem_sub_tablet.h" +#include "olap/memory/write_txn.h" + namespace doris { namespace memory { MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) - : BaseTablet(tablet_meta, data_dir) {} + : BaseTablet(tablet_meta, data_dir) { + _mem_schema.reset(new Schema(_schema)); +} MemTablet::~MemTablet() {} +std::shared_ptr<MemTablet> MemTablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, + DataDir* data_dir) { + return std::make_shared<MemTablet>(tablet_meta, data_dir); +} + +Status MemTablet::init() { + return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet); +} + +Status MemTablet::scan(std::unique_ptr<ScanSpec>&& spec, std::unique_ptr<MemTabletScan>* scan) { + return Status::NotSupported("scan not supported"); +} + +Status MemTablet::create_write_txn(std::unique_ptr<WriteTxn>* wtxn) { + wtxn->reset(new WriteTxn(&_mem_schema)); + return Status::OK(); +} + +Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) { + std::lock_guard<std::mutex> lg(_write_lock); + RETURN_IF_ERROR(_sub_tablet->begin_write(&_mem_schema)); + for (size_t i = 0; i < wtxn->batch_size(); i++) { + auto batch = wtxn->get_batch(i); + RETURN_IF_ERROR(_sub_tablet->apply_partial_row_batch(batch)); + } + return _sub_tablet->commit_write(version); +} + } // namespace memory } // namespace doris diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h index 7efc945..cfc8c3c 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/mem_tablet.h @@ -18,22 +18,67 @@ #pragma once #include "olap/base_tablet.h" +#include "olap/memory/schema.h" namespace doris { namespace memory { +class MemSubTablet; +class ScanSpec; +class MemTabletScan; +class WriteTxn; + // Tablet class for memory-optimized storage engine. // // It stores all its data in-memory, and is designed for tables with // frequent updates. // -// TODO: This is just a skeleton, will add implementation in the future. +// By design, MemTablet stores all the schema versions together inside a single +// MemTablet, while olap/Tablet generate a new Tablet after schema change. so their +// behaviors are not compatible, we will address this issue latter after adding schema +// change support, currently MemTablet does not support schema change(only have single +// version of schema). +// +// TODO: will add more functionality as project evolves. class MemTablet : public BaseTablet { public: + static std::shared_ptr<MemTablet> create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, + DataDir* data_dir = nullptr); + MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir); + virtual ~MemTablet(); + // Initialize + Status init(); + + // Scan the tablet, return a MemTabletScan object scan, user can specify projections, + // predicates and aggregations using ScanSpec, currently only support full scan with + // projection. + // + // Note: thread-safe, supports multi-reader concurrency. + Status scan(std::unique_ptr<ScanSpec>&& spec, std::unique_ptr<MemTabletScan>* scan); + + // Create a write transaction + // + // Note: Thread-safe, can have multiple writetxn at the same time. + Status create_write_txn(std::unique_ptr<WriteTxn>* wtxn); + + // Apply a write transaction and commit as the specified version + // + // Note: commit is done sequentially, protected by internal write lock + Status commit_write_txn(WriteTxn* wtxn, uint64_t version); + private: + // memory::Schema is used internally rather than TabletSchema, so we need an extra + // copy of _schema with type memory::Schema. + scoped_refptr<Schema> _mem_schema; + + // TODO: support multiple sub-tablets in the future + std::unique_ptr<MemSubTablet> _sub_tablet; + + std::mutex _write_lock; + DISALLOW_COPY_AND_ASSIGN(MemTablet); }; diff --git a/be/src/olap/memory/partial_row_batch.cpp b/be/src/olap/memory/partial_row_batch.cpp new file mode 100644 index 0000000..b7d2b5e --- /dev/null +++ b/be/src/olap/memory/partial_row_batch.cpp @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/partial_row_batch.h" + +#include "util/bitmap.h" + +namespace doris { +namespace memory { + +// Methods for PartialRowBatch + +PartialRowBatch::PartialRowBatch(scoped_refptr<Schema>* schema) + : _schema(*schema), _bit_set_size(_schema->cid_size()) { + _cells.reserve(_schema->num_columns()); +} + +PartialRowBatch::~PartialRowBatch() {} + +Status PartialRowBatch::load(std::vector<uint8_t>&& buffer) { + _buffer = std::move(buffer); + _pos = _buffer.data(); + _row_size = *reinterpret_cast<const uint64_t*>(_pos); + _pos += sizeof(uint64_t); + _next_row = 0; + return Status::OK(); +} + +Status PartialRowBatch::next_row(bool* has_row) { + DCHECK_LE(_next_row, _row_size); + *has_row = false; + if (_next_row == _row_size) { + return Status::OK(); + } + DCHECK_LE(_pos, _buffer.data() + _buffer.size()); + _cells.clear(); + uint32_t row_bsize = *reinterpret_cast<const uint32_t*>(_pos); + _pos += sizeof(uint32_t); + const uint8_t* cur = _pos; + size_t bit_all_size = *reinterpret_cast<const uint16_t*>(cur); + cur += 2; + DCHECK_LE(bit_all_size, 65535); + const uint8_t* bitvec = cur; + cur += bit_all_size; + size_t cur_nullable_idx = _bit_set_size; + if (BitmapTest(bitvec, 0)) { + _delete = true; + } + size_t cid = 1; + while (BitmapFindFirstSet(bitvec, cid, _bit_set_size, &cid)) { + const ColumnSchema* cs = _schema->get_by_cid(cid); + DCHECK(cs); + if (cs->is_nullable()) { + if (BitmapTest(bitvec, cur_nullable_idx)) { + // is null + _cells.emplace_back(cid, nullptr); + } else { + // not null + _cells.emplace_back(cid, cur); + } + cur_nullable_idx++; + } else { + _cells.emplace_back(cid, cur); + } + const uint8_t* pdata = _cells.back().data; + if (pdata != nullptr) { + size_t bsize = _schema->get_column_byte_size(cid); + if (bsize == 0) { + return Status::NotSupported("varlen column type not supported"); + // size_t sz = *(uint16_t*)cur; + // cur += (sz + 2); + } else { + cur += bsize; + } + } + cid++; + } + if (_pos + row_bsize != cur) { + return Status::InternalError("PartialRowBatch data corruption"); + } + _pos = cur; + *has_row = true; + _next_row++; + return Status::OK(); +} + +Status PartialRowBatch::cur_row_get_cell(size_t idx, const ColumnSchema** cs, + const void** data) const { + if (idx >= _cells.size()) { + return Status::InvalidArgument("get_cell: idx exceed cells size"); + } + auto& cell = _cells[idx]; + *cs = _schema->get_by_cid(cell.cid); + *data = cell.data; + return Status::OK(); +} + +// Methods for PartialRowWriter + +PartialRowWriter::PartialRowWriter(scoped_refptr<Schema>* schema) + : _schema(*schema), _bit_set_size(_schema->cid_size()), _bit_nullable_size(0) { + _temp_cells.resize(_schema->cid_size()); +} + +Status PartialRowWriter::start_batch(size_t row_capacity, size_t byte_capacity) { + _row_size = 0; + _row_capacity = row_capacity; + // reserve space for _row_size + _buffer.resize(sizeof(uint64_t)); + _buffer.reserve(byte_capacity); + return Status::OK(); +} + +PartialRowWriter::~PartialRowWriter() {} + +Status PartialRowWriter::start_row() { + if (_row_size >= _row_capacity) { + return Status::InvalidArgument("over capacity"); + } + _bit_nullable_size = 0; + memset(&(_temp_cells[0]), 0, sizeof(CellInfo) * _temp_cells.size()); + return Status::OK(); +} + +Status PartialRowWriter::end_row() { + if (_row_size >= _row_capacity) { + return Status::InvalidArgument("over capacity"); + } + size_t row_byte_size = byte_size(); + size_t new_size = _buffer.size() + row_byte_size + 4; + size_t old_size = _buffer.size(); + if (new_size > _buffer.capacity()) { + return Status::InvalidArgument("over capacity"); + } + _buffer.resize(new_size); + uint8_t* pos = _buffer.data() + old_size; + *reinterpret_cast<uint32_t*>(pos) = row_byte_size; + pos += sizeof(uint32_t); + Status st = write(&pos); + DCHECK_EQ(pos, _buffer.data() + new_size); + if (!st.ok()) { + _buffer.resize(old_size); + return st; + } + _row_size++; + return Status::OK(); +} + +Status PartialRowWriter::set(const ColumnSchema* cs, uint32_t cid, const void* data) { + if (cs->is_nullable() || (data != nullptr)) { + if (cs->is_nullable() && !_temp_cells[cid].isnullable) { + _bit_nullable_size++; + } + _temp_cells[cid].isnullable = cs->is_nullable(); + _temp_cells[cid].isset = 1; + _temp_cells[cid].data = reinterpret_cast<const uint8_t*>(data); + } else { + return Status::InvalidArgument("not nullable column set to null"); + } + return Status::OK(); +} + +Status PartialRowWriter::set(const string& col, const void* data) { + auto cs = _schema->get_by_name(col); + if (cs == nullptr) { + return Status::NotFound("col name not found"); + } + return set(cs, cs->cid(), data); +} + +Status PartialRowWriter::set(uint32_t cid, const void* data) { + auto cs = _schema->get_by_cid(cid); + if (cs == nullptr) { + return Status::NotFound("cid not found"); + } + return set(cs, cs->cid(), data); +} + +Status PartialRowWriter::set_delete() { + // TODO: support delete + // _temp_cells[0].isset = 1; + return Status::NotSupported("delete not supported"); +} + +size_t PartialRowWriter::byte_size() const { + // TODO: support delete + size_t bit_all_size = num_block(_bit_set_size + _bit_nullable_size, 8); + size_t data_size = 2 + bit_all_size; + for (size_t i = 1; i < _temp_cells.size(); i++) { + if (_temp_cells[i].data != nullptr) { + size_t bsize = _schema->get_column_byte_size(i); + if (bsize == 0) { + LOG(FATAL) << "varlen column type not supported"; + //data_size += 2 + reinterpret_cast<Slice*>(_temp_cells[i].data)->size(); + } else { + data_size += bsize; + } + } + } + return data_size; +} + +Status PartialRowWriter::write(uint8_t** ppos) { + size_t bit_all_size = num_block(_bit_set_size + _bit_nullable_size, 8); + if (bit_all_size >= 65536) { + return Status::NotSupported("too many columns"); + } + // using reference is more convenient + uint8_t*& pos = *ppos; + *reinterpret_cast<uint16_t*>(pos) = (uint16_t)bit_all_size; + pos += 2; + uint8_t* bitvec = pos; + pos += bit_all_size; + memset(bitvec, 0, bit_all_size); + if (_temp_cells[0].isset) { + // deleted + BitmapSet(bitvec, 0); + } + size_t cur_nullable_idx = _bit_set_size; + for (size_t i = 1; i < _temp_cells.size(); i++) { + if (_temp_cells[i].isset) { + BitmapSet(bitvec, i); + if (_temp_cells[i].isnullable) { + if (_temp_cells[i].data == nullptr) { + BitmapSet(bitvec, cur_nullable_idx); + } + cur_nullable_idx++; + } + const uint8_t* pdata = _temp_cells[i].data; + if (pdata != nullptr) { + size_t bsize = _schema->get_column_byte_size(i); + if (bsize == 0) { + return Status::NotSupported("varlen column type not supported"); + // Some incomplete code to write string(Slice), may be useful in future + // size_t sz = ((Slice*)pdata)->size(); + // *(uint16_t*)pos = (uint16_t)sz; + // pos += 2; + // memcpy(pos, ((Slice*)pdata)->data(), sz); + // pos += sz; + } else { + memcpy(pos, _temp_cells[i].data, bsize); + pos += bsize; + } + } + } else if (i <= _schema->num_key_columns()) { + return Status::InvalidArgument("build without key columns"); + } + } + return Status::OK(); +} + +Status PartialRowWriter::finish_batch(vector<uint8_t>* buffer) { + *reinterpret_cast<uint64_t*>(_buffer.data()) = _row_size; + _buffer.swap(*buffer); + _row_size = 0; + return Status::OK(); +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/partial_row_batch.h b/be/src/olap/memory/partial_row_batch.h new file mode 100644 index 0000000..893f865 --- /dev/null +++ b/be/src/olap/memory/partial_row_batch.h @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/common.h" +#include "olap/memory/schema.h" + +namespace doris { +namespace memory { + +// A chunk of memory that stores a batch of serialized partial rows +// User can iterate through all the partial rows, get each partial row's cells. +// +// Serialization format for a batch: +// 4 byte len | serialized partial row +// 4 byte len | serialized partial row +// ... +// 4 byte len | serialized partial row +// +// Serialization format for a partial row +// bit vector(se + null) byte size (2 byte) | +// bit vector mark set cells | +// bit vector mark nullable cells' null value | +// 8bit padding +// serialized not null cells +// +// Example usage: +// PartialRowBatch rb(&schema); +// rb.load(buffer); +// while (true) { +// bool has; +// rb.next(&has); +// if (!has) break; +// for (size_t j=0; j < reader.cell_size(); j++) { +// const ColumnSchema* cs = nullptr; +// const void* data = nullptr; +// // get column cell type and data +// rb.get_cell(j, &cs, &data); +// } +// } +// +// Note: currently only fixed length column types are supported. All length and scalar types store +// in native byte order(little endian in x86-64). +// +// Note: The serialization format is simple, it only provides basic functionalities +// so we can quickly complete the whole create/read/write pipeline. The format may change +// as the project evolves. +class PartialRowBatch { +public: + explicit PartialRowBatch(scoped_refptr<Schema>* schema); + ~PartialRowBatch(); + + const Schema& schema() const { return *_schema.get(); } + + // Load from a serialized buffer + Status load(std::vector<uint8_t>&& buffer); + + // Return row count in this batch + size_t row_size() const { return _row_size; } + + // Iterate to next row, mark has_row to false if there is no more rows + Status next_row(bool* has_row); + + // Get row operation cell count + size_t cur_row_cell_size() const { return _cells.size(); } + // Get row operation cell by index idx, return ColumnSchema and data pointer + Status cur_row_get_cell(size_t idx, const ColumnSchema** cs, const void** data) const; + +private: + scoped_refptr<Schema> _schema; + + bool _delete = false; + size_t _bit_set_size = 0; + struct CellInfo { + CellInfo(uint32_t cid, const void* data) + : cid(cid), data(reinterpret_cast<const uint8_t*>(data)) {} + uint32_t cid = 0; + const uint8_t* data = nullptr; + }; + vector<CellInfo> _cells; + + size_t _next_row = 0; + size_t _row_size = 0; + const uint8_t* _pos = nullptr; + std::vector<uint8_t> _buffer; +}; + +// Writer for PartialRowBatch +// +// Example usage: +// scoped_refptr<Schema> sc; +// Schema::create("id int,uv int,pv int,city tinyint null", &sc); +// PartialRowWriter writer(*sc.get()); +// writer.start_batch(); +// for (auto& row : rows) { +// writer.start_row(); +// writer.set("column_name", value); +// ... +// writer.set(column_id, value); +// writer.end_row(); +// } +// vector<uint8_t> buffer; +// writer.finish_batch(&buffer); +class PartialRowWriter { +public: + static const size_t DEFAULT_BYTE_CAPACITY = 1 << 20; + static const size_t DEFAULT_ROW_CAPACIT = 1 << 16; + + explicit PartialRowWriter(scoped_refptr<Schema>* schema); + ~PartialRowWriter(); + + Status start_batch(size_t row_capacity = DEFAULT_ROW_CAPACIT, + size_t byte_capacity = DEFAULT_BYTE_CAPACITY); + + // Start writing a new row + Status start_row(); + + // Set cell value by column name + // param data's memory must remain valid before calling build + Status set(const string& col, const void* data); + + // Set cell value by column id + // param data's memory must remain valid before calling build + Status set(uint32_t cid, const void* data); + + // set this row is delete operation + Status set_delete(); + + // Finish writing a row + Status end_row(); + + // Finish writing the whole ParitialRowBatch, return a serialized buffer + Status finish_batch(vector<uint8_t>* buffer); + +private: + Status set(const ColumnSchema* cs, uint32_t cid, const void* data); + size_t byte_size() const; + Status write(uint8_t** ppos); + + scoped_refptr<Schema> _schema; + struct CellInfo { + CellInfo() = default; + uint32_t isset = 0; + uint32_t isnullable = 0; + const uint8_t* data = nullptr; + }; + vector<CellInfo> _temp_cells; + size_t _bit_set_size = 0; + size_t _bit_nullable_size = 0; + size_t _row_size = 0; + size_t _row_capacity = 0; + + std::vector<uint8_t> _buffer; +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/schema.cpp b/be/src/olap/memory/schema.cpp index cf9b3d3..d0b89c4 100644 --- a/be/src/olap/memory/schema.cpp +++ b/be/src/olap/memory/schema.cpp @@ -17,9 +17,50 @@ #include "olap/memory/schema.h" +#include "gutil/strings/split.h" + namespace doris { namespace memory { +bool supported(ColumnType type) { + switch (type) { + case OLAP_FIELD_TYPE_TINYINT: + case OLAP_FIELD_TYPE_SMALLINT: + case OLAP_FIELD_TYPE_INT: + case OLAP_FIELD_TYPE_BIGINT: + case OLAP_FIELD_TYPE_LARGEINT: + case OLAP_FIELD_TYPE_FLOAT: + case OLAP_FIELD_TYPE_DOUBLE: + case OLAP_FIELD_TYPE_BOOL: + return true; + default: + return false; + } +} + +size_t get_type_byte_size(ColumnType type) { + switch (type) { + case OLAP_FIELD_TYPE_TINYINT: + return 1; + case OLAP_FIELD_TYPE_SMALLINT: + return 2; + case OLAP_FIELD_TYPE_INT: + return 4; + case OLAP_FIELD_TYPE_BIGINT: + return 8; + case OLAP_FIELD_TYPE_LARGEINT: + return 16; + case OLAP_FIELD_TYPE_FLOAT: + return 4; + case OLAP_FIELD_TYPE_DOUBLE: + return 8; + case OLAP_FIELD_TYPE_BOOL: + return 1; + default: + return 0; + } +} + ColumnSchema::ColumnSchema(const TabletColumn& tcolumn) : _tcolumn(tcolumn) {} ColumnSchema::ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool nullable, @@ -44,15 +85,46 @@ std::string ColumnSchema::debug_string() const { ////////////////////////////////////////////////////////////////////////////// +Status Schema::create(const string& desc, scoped_refptr<Schema>* sc) { + TabletSchemaPB tspb; + std::vector<std::string> cs = strings::Split(desc, ",", strings::SkipWhitespace()); + uint32_t cid = 1; + for (std::string& c : cs) { + ColumnPB* cpb = tspb.add_column(); + std::vector<std::string> fs = strings::Split(c, " ", strings::SkipWhitespace()); + if (fs.size() < 2) { + return Status::InvalidArgument("bad schema desc"); + } + cpb->set_is_key(cid == 1); + cpb->set_unique_id(cid++); + cpb->set_name(fs[0]); + cpb->set_type(fs[1]); + if (fs.size() == 3 && fs[2] == "null") { + cpb->set_is_nullable(true); + } + } + tspb.set_keys_type(KeysType::UNIQUE_KEYS); + tspb.set_next_column_unique_id(cid); + tspb.set_num_short_key_columns(1); + tspb.set_is_in_memory(false); + TabletSchema ts; + ts.init_from_pb(tspb); + sc->reset(new Schema(ts)); + return Status::OK(); +} + Schema::Schema(const TabletSchema& tschema) : _tschema(tschema) { _cid_size = 1; _cid_to_col.resize(_cid_size, nullptr); + _column_byte_sizes.resize(_cid_size, 0); for (size_t i = 0; i < num_columns(); i++) { const ColumnSchema* cs = get(i); _cid_size = std::max(_cid_size, cs->cid() + 1); _cid_to_col.resize(_cid_size, nullptr); _cid_to_col[cs->cid()] = cs; _name_to_col[cs->name()] = cs; + _column_byte_sizes.resize(_cid_size, 0); + _column_byte_sizes[cs->cid()] = get_type_byte_size(cs->type()); } } diff --git a/be/src/olap/memory/schema.h b/be/src/olap/memory/schema.h index ebe3571..9a41480 100644 --- a/be/src/olap/memory/schema.h +++ b/be/src/olap/memory/schema.h @@ -29,22 +29,36 @@ namespace memory { // Memory engine's column type, just use FieldType for now typedef FieldType ColumnType; +// Return true if this ColumnType is supported +bool supported(ColumnType type); + // Memory engine's column schema, simple wrapper of TabletColumn. // TODO: Add more properties and methods later class ColumnSchema { public: explicit ColumnSchema(const TabletColumn& tcolumn); ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool nullable, bool is_key); + + // Get column id inline uint32_t cid() const { return static_cast<uint32_t>(_tcolumn.unique_id()); } + + // Get column name inline std::string name() const { return _tcolumn.name(); } + + // Get column type inline ColumnType type() const { return _tcolumn.type(); } + + // Get is nullable inline bool is_nullable() const { return _tcolumn.is_nullable(); } + + // Get is key inline bool is_key() const { return _tcolumn.is_key(); } std::string type_name() const; std::string debug_string() const; private: + // Note: do not add more field into this class, it needs to be identical to TabletColumn TabletColumn _tcolumn; }; @@ -55,25 +69,47 @@ private: // 2. in the future, there may be a special compound primary key column // if primary-key has multiple columns // TODO: Add more properties and methods later -class Schema { +class Schema : public RefCountedThreadSafe<Schema> { public: + // Create schema by description string, utility method for test + static Status create(const string& desc, scoped_refptr<Schema>* sc); + explicit Schema(const TabletSchema& tschema); + std::string debug_string() const; + inline size_t num_columns() const { return _tschema.num_columns(); } + inline size_t num_key_columns() const { return _tschema.num_key_columns(); } + // Get ColumnSchema by index const ColumnSchema* get(size_t idx) const; + // Get ColumnSchema by name const ColumnSchema* get_by_name(const string& name) const; + // Get column id space size + // + // For example: + // If a schema have 5 columns with id [1, 2, 3, 5, 6] + // It's cid_size equals max(cid)+1 = 7 uint32_t cid_size() const; + + // Get ColumnSchema by column id const ColumnSchema* get_by_cid(uint32_t cid) const; + // Get column type byte size by column id + size_t get_column_byte_size(uint32_t cid) const { + DCHECK_LT(cid, _column_byte_sizes.size()); + return _column_byte_sizes[cid]; + } + private: TabletSchema _tschema; uint32_t _cid_size; std::unordered_map<string, const ColumnSchema*> _name_to_col; vector<const ColumnSchema*> _cid_to_col; + vector<size_t> _column_byte_sizes; }; } // namespace memory diff --git a/be/src/olap/memory/mem_tablet.cpp b/be/src/olap/memory/write_txn.cpp similarity index 70% copy from be/src/olap/memory/mem_tablet.cpp copy to be/src/olap/memory/write_txn.cpp index 5c2edd8..908a0c5 100644 --- a/be/src/olap/memory/mem_tablet.cpp +++ b/be/src/olap/memory/write_txn.cpp @@ -15,15 +15,23 @@ // specific language governing permissions and limitations // under the License. -#include "olap/memory/mem_tablet.h" +#include "olap/memory/write_txn.h" namespace doris { namespace memory { -MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) - : BaseTablet(tablet_meta, data_dir) {} +WriteTxn::WriteTxn(scoped_refptr<Schema>* schema) : _schema(schema->get()) {} -MemTablet::~MemTablet() {} +WriteTxn::~WriteTxn() {} + +PartialRowBatch* WriteTxn::new_batch() { + _batches.emplace_back(new PartialRowBatch(&_schema)); + return _batches.back().get(); +} + +PartialRowBatch* WriteTxn::get_batch(size_t idx) const { + return _batches[idx].get(); +} } // namespace memory } // namespace doris diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/write_txn.h similarity index 50% copy from be/src/olap/memory/mem_tablet.h copy to be/src/olap/memory/write_txn.h index 7efc945..74cee3c 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/write_txn.h @@ -17,24 +17,41 @@ #pragma once -#include "olap/base_tablet.h" +#include "olap/memory/common.h" +#include "olap/memory/partial_row_batch.h" +#include "olap/memory/schema.h" namespace doris { namespace memory { -// Tablet class for memory-optimized storage engine. +class PartialRowBatch; + +// Class for write transaction // -// It stores all its data in-memory, and is designed for tables with -// frequent updates. +// Note: Currently it stores all its operations in memory, to make things simple, +// so we can quickly complete the whole create/read/write pipeline. The data structure may +// change as the project evolves. // -// TODO: This is just a skeleton, will add implementation in the future. -class MemTablet : public BaseTablet { +// TODO: add write to/load from WritexTx files in future. +class WriteTxn { public: - MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir); - virtual ~MemTablet(); + explicit WriteTxn(scoped_refptr<Schema>* schema); + ~WriteTxn(); + + const Schema& schema() const { return *_schema.get(); } + + // Get number of batches + size_t batch_size() const { return _batches.size(); } + + // Get batch by index + PartialRowBatch* get_batch(size_t idx) const; + + // Add a new batch to this WriteTx + PartialRowBatch* new_batch(); private: - DISALLOW_COPY_AND_ASSIGN(MemTablet); + scoped_refptr<Schema> _schema; + std::vector<std::unique_ptr<PartialRowBatch>> _batches; }; } // namespace memory diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 96f1de4..2b49580 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -53,14 +53,6 @@ TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, return std::make_shared<Tablet>(tablet_meta, data_dir); } -void Tablet::_gen_tablet_path() { - std::string path = _data_dir->path() + DATA_PREFIX; - path = path_util::join_path_segments(path, std::to_string(_tablet_meta->shard_id())); - path = path_util::join_path_segments(path, std::to_string(_tablet_meta->tablet_id())); - path = path_util::join_path_segments(path, std::to_string(_tablet_meta->schema_hash())); - _tablet_path = path; -} - Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : BaseTablet(tablet_meta, data_dir), _is_bad(false), @@ -69,7 +61,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : _last_cumu_compaction_success_millis(0), _last_base_compaction_success_millis(0), _cumulative_point(kInvalidCumulativePoint) { - _gen_tablet_path(); _rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas()); } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index f8746f8..40cac13 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -78,7 +78,7 @@ public: inline KeysType keys_type() const; inline size_t num_columns() const; inline size_t num_null_columns() const; - inline size_t num_key_columns() const ; + inline size_t num_key_columns() const; inline size_t num_short_key_columns() const; inline size_t num_rows_per_row_block() const; inline CompressKind compress_kind() const; @@ -233,7 +233,6 @@ private: OLAPStatus _contains_version(const Version& version); void _max_continuous_version_from_begining_unlocked(Version* version, VersionHash* v_hash) const ; - void _gen_tablet_path(); RowsetSharedPtr _rowset_with_largest_size(); void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& version_path, diff --git a/be/src/util/time.h b/be/src/util/time.h index 01d56c6..5a4bfbb 100755 --- a/be/src/util/time.h +++ b/be/src/util/time.h @@ -59,6 +59,10 @@ inline int64_t MonotonicSeconds() { return GetMonoTimeMicros() / MICROS_PER_SEC; } +inline double GetMonoTimeSecondsAsDouble() { + return GetMonoTimeMicros() / static_cast<double>(MICROS_PER_SEC); +} + // Returns the time since the Epoch measured in microseconds. inline int64_t GetCurrentTimeMicros() { timespec ts; diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index 48c34ce..f00d7b7 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -86,3 +86,4 @@ ADD_BE_TEST(memory/hash_index_test) ADD_BE_TEST(memory/column_delta_test) ADD_BE_TEST(memory/schema_test) ADD_BE_TEST(memory/column_test) +ADD_BE_TEST(memory/partial_row_batch_test) diff --git a/be/test/olap/memory/partial_row_batch_test.cpp b/be/test/olap/memory/partial_row_batch_test.cpp new file mode 100644 index 0000000..354795c --- /dev/null +++ b/be/test/olap/memory/partial_row_batch_test.cpp @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/partial_row_batch.h" + +#include <gtest/gtest.h> + +#include <vector> + +#include "util/hash_util.hpp" + +namespace doris { +namespace memory { + +TEST(PartialRowbatch, write) { + scoped_refptr<Schema> sc; + ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", &sc).ok()); + PartialRowWriter writer(&sc); + srand(1); + const int N = 1000; + size_t nrow = 0; + // add insert/update operation + EXPECT_TRUE(writer.start_batch().ok()); + for (int i = 0; i < N; i++) { + nrow++; + writer.start_row(); + int id = i; + int uv = rand() % 10000; + int pv = rand() % 10000; + int8_t city = rand() % 100; + EXPECT_TRUE(writer.set("id", &id).ok()); + if (i % 3 == 0) { + EXPECT_TRUE(writer.set("uv", &uv).ok()); + EXPECT_TRUE(writer.set("pv", &pv).ok()); + EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr : &city).ok()); + } + EXPECT_TRUE(writer.end_row().ok()); + } + vector<uint8_t> buffer; + writer.finish_batch(&buffer); + + PartialRowBatch rb(&sc); + EXPECT_TRUE(rb.load(std::move(buffer)).ok()); + EXPECT_EQ(rb.row_size(), nrow); + // read from rowbatch and check equality + srand(1); + for (size_t i = 0; i < nrow; i++) { + bool has_row = false; + EXPECT_TRUE(rb.next_row(&has_row).ok()); + EXPECT_TRUE(has_row); + if (i % 3 == 0) { + EXPECT_EQ(rb.cur_row_cell_size(), 4); + } else { + EXPECT_EQ(rb.cur_row_cell_size(), 1); + } + int id = i; + int uv = rand() % 10000; + int pv = rand() % 10000; + int8_t city = rand() % 100; + + const ColumnSchema* cs = nullptr; + const void* data = nullptr; + + EXPECT_TRUE(rb.cur_row_get_cell(0, &cs, &data).ok()); + EXPECT_EQ(cs->cid(), 1); + EXPECT_EQ(*(int32_t*)data, id); + + if (i % 3 == 0) { + EXPECT_TRUE(rb.cur_row_get_cell(1, &cs, &data).ok()); + EXPECT_EQ(cs->cid(), 2); + EXPECT_EQ(*(int32_t*)data, uv); + + EXPECT_TRUE(rb.cur_row_get_cell(2, &cs, &data).ok()); + EXPECT_EQ(cs->cid(), 3); + EXPECT_EQ(*(int32_t*)data, pv); + + EXPECT_TRUE(rb.cur_row_get_cell(3, &cs, &data).ok()); + EXPECT_EQ(cs->cid(), 4); + if (city % 2 == 0) { + EXPECT_EQ(data, nullptr); + } else { + EXPECT_EQ(*(int8_t*)data, city); + } + } + } + bool has_row = false; + EXPECT_TRUE(rb.next_row(&has_row).ok()); + EXPECT_FALSE(has_row); +} + +} // namespace memory +} // namespace doris + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/memory/schema_test.cpp b/be/test/olap/memory/schema_test.cpp index 8772288..71d7d52 100644 --- a/be/test/olap/memory/schema_test.cpp +++ b/be/test/olap/memory/schema_test.cpp @@ -32,6 +32,20 @@ TEST(ColumnSchema, create) { EXPECT_TRUE(cs.is_key()); } +TEST(Schema, desc_create) { + scoped_refptr<Schema> sc; + ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", &sc).ok()); + ASSERT_EQ(sc->num_columns(), 4); + ASSERT_EQ(sc->num_key_columns(), 1); + ASSERT_EQ(sc->get(0)->cid(), 1); + ASSERT_EQ(sc->get(1)->cid(), 2); + ASSERT_EQ(sc->get(2)->cid(), 3); + ASSERT_EQ(sc->get(3)->cid(), 4); + ASSERT_EQ(sc->get_by_name("city")->is_nullable(), true); + ASSERT_EQ(sc->get_by_name("pv")->is_nullable(), false); + ASSERT_EQ(sc->get_by_name("uv")->type(), ColumnType::OLAP_FIELD_TYPE_INT); +} + TEST(Schema, create) { TabletSchemaPB tspb; auto cpb = tspb.add_column(); @@ -52,10 +66,10 @@ TEST(Schema, create) { tspb.set_is_in_memory(false); TabletSchema ts; ts.init_from_pb(tspb); - Schema schema(ts); - EXPECT_EQ(schema.cid_size(), 3); - EXPECT_EQ(schema.get_by_name("uid")->name(), std::string("uid")); - EXPECT_EQ(schema.get_by_cid(1)->name(), std::string("uid")); + scoped_refptr<Schema> schema(new Schema(ts)); + EXPECT_EQ(schema->cid_size(), 3); + EXPECT_EQ(schema->get_by_name("uid")->name(), std::string("uid")); + EXPECT_EQ(schema->get_by_cid(1)->name(), std::string("uid")); } } // namespace memory --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org