This is an automated email from the ASF dual-hosted git repository. lichaoyong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a7cfafe [Memory Engine] add core column related classes (#3508) a7cfafe is described below commit a7cfafe076d067f5b73ba6e55ccf54523600326b Author: Binglin Chang <decst...@gmail.com> AuthorDate: Wed May 13 16:30:32 2020 +0800 [Memory Engine] add core column related classes (#3508) add core column related classes --- be/src/olap/memory/CMakeLists.txt | 6 ++ be/src/olap/memory/{mem_tablet.h => buffer.cpp} | 48 ++++++--- be/src/olap/memory/buffer.h | 74 ++++++++++++++ be/src/olap/memory/column.cpp | 124 ++++++++++++++++++++++++ be/src/olap/memory/column.h | 107 ++++++++++++++++++++ be/src/olap/memory/column_block.cpp | 71 ++++++++++++++ be/src/olap/memory/column_block.h | 61 ++++++++++++ be/src/olap/memory/column_delta.cpp | 55 +++++++++++ be/src/olap/memory/column_delta.h | 56 +++++++++++ be/src/olap/memory/{mem_tablet.h => common.h} | 37 ++++--- be/src/olap/memory/delta_index.cpp | 53 ++++++++++ be/src/olap/memory/delta_index.h | 76 +++++++++++++++ be/src/olap/memory/hash_index.cpp | 2 + be/src/olap/memory/hash_index.h | 9 +- be/src/olap/memory/mem_tablet.cpp | 9 +- be/src/olap/memory/mem_tablet.h | 12 ++- be/src/olap/memory/schema.cpp | 102 +++++++++++++++++++ be/src/olap/memory/schema.h | 80 +++++++++++++++ be/test/olap/CMakeLists.txt | 2 + be/test/olap/memory/column_delta_test.cpp | 81 ++++++++++++++++ be/test/olap/memory/hash_index_test.cpp | 2 + be/test/olap/memory/schema_test.cpp | 67 +++++++++++++ 22 files changed, 1091 insertions(+), 43 deletions(-) diff --git a/be/src/olap/memory/CMakeLists.txt b/be/src/olap/memory/CMakeLists.txt index 366e8e5..9de9095 100644 --- a/be/src/olap/memory/CMakeLists.txt +++ b/be/src/olap/memory/CMakeLists.txt @@ -22,6 +22,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap/memory") set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap/memory") add_library(Memory STATIC + buffer.cpp + column.cpp + column_block.cpp + column_delta.cpp + delta_index.cpp hash_index.cpp mem_tablet.cpp + schema.cpp ) diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/buffer.cpp similarity index 53% copy from be/src/olap/memory/mem_tablet.h copy to be/src/olap/memory/buffer.cpp index 9fd1312..167b1df 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/buffer.cpp @@ -15,25 +15,41 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ -#define DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ - -#include "olap/base_tablet.h" +#include "olap/memory/buffer.h" namespace doris { +namespace memory { -// Tablet class for memory-optimized storage engine. -// -// It stores all its data in-memory, and is designed for tables with -// frequent updates. -// -// TODO: This is just a skeleton, will add implementation in the future. -class MemTablet : public BaseTablet { +Status Buffer::alloc(size_t bsize) { + if (bsize > 0) { + uint8_t* data = + reinterpret_cast<uint8_t*>(aligned_malloc(bsize, bsize >= 4096 ? 4096 : 64)); + if (!data) { + return Status::MemoryAllocFailed(StringPrintf("alloc buffer size=%zu failed", bsize)); + } + _data = data; + _bsize = bsize; + } + return Status::OK(); +} + +void Buffer::clear() { + if (_data) { + free(_data); + _data = nullptr; + _bsize = 0; + } +} -private: - DISALLOW_COPY_AND_ASSIGN(MemTablet); -}; +void Buffer::set_zero() { + if (_data) { + memset(_data, 0, _bsize); + } +} -} /* namespace doris */ +Buffer::~Buffer() { + clear(); +} -#endif /* DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ */ +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/buffer.h b/be/src/olap/memory/buffer.h new file mode 100644 index 0000000..bb40132 --- /dev/null +++ b/be/src/olap/memory/buffer.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/common.h" + +namespace doris { +namespace memory { + +// A generic buffer holding column base and delta data +// It can be considered as an array of any primitive type, but it does not +// have compile time type information, user can use utility method as<T> to +// get typed array view. +class Buffer { +public: + Buffer() = default; + ~Buffer(); + + // allocate memory for this buffer, with buffer byte size of bsize + Status alloc(size_t bsize); + + // clear buffer, free memory + void clear(); + + // set all memory content to zero + void set_zero(); + + // return true if this buffer is not empty + operator bool() const { return _data != nullptr; } + + // returns a direct pointer to the memory array + const uint8_t* data() const { return _data; } + + // returns a direct pointer to the memory array + uint8_t* data() { return _data; } + + // get byte size of the buffer + size_t bsize() const { return _bsize; } + + // get typed array view + template <class T> + T* as() { + return reinterpret_cast<T*>(_data); + } + + // get typed array view + template <class T> + const T* as() const { + return reinterpret_cast<const T*>(_data); + } + +private: + size_t _bsize = 0; + uint8_t* _data = nullptr; + DISALLOW_COPY_AND_ASSIGN(Buffer); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/column.cpp b/be/src/olap/memory/column.cpp new file mode 100644 index 0000000..4ba70e4 --- /dev/null +++ b/be/src/olap/memory/column.cpp @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/column.h" + +namespace doris { +namespace memory { + +Column::Column(const ColumnSchema& cs, ColumnType storage_type, uint64_t version) + : _cs(cs), _storage_type(storage_type), _base_idx(0) { + _base.reserve(BASE_CAPACITY_MIN_STEP_SIZE); + _versions.reserve(VERSION_CAPACITY_STEP_SIZE); + _versions.emplace_back(version); +} + +Column::Column(const Column& rhs, size_t new_base_capacity, size_t new_version_capacity) + : _cs(rhs._cs), _storage_type(rhs._storage_type), _base_idx(rhs._base_idx) { + _base.reserve(std::max(new_base_capacity, rhs._base.capacity())); + _base.resize(rhs._base.size()); + for (size_t i = 0; i < _base.size(); i++) { + _base[i] = rhs._base[i]; + } + _versions.reserve(std::max(new_version_capacity, rhs._versions.capacity())); + _versions.resize(rhs._versions.size()); + for (size_t i = 0; i < _versions.size(); i++) { + _versions[i] = rhs._versions[i]; + } +} + +size_t Column::memory() const { + size_t bs = _base.size(); + size_t ds = _versions.size(); + size_t base_memory = 0; + for (size_t i = 0; i < bs; i++) { + base_memory += _base[i]->memory(); + } + size_t delta_memory = 0; + for (size_t i = 0; i < ds; i++) { + if (_versions[i].delta) { + delta_memory += _versions[i].delta->memory(); + } + } + return base_memory + delta_memory; +} + +string Column::debug_string() const { + return StringPrintf("Column(%s version=%zu)", _cs.debug_string().c_str(), + _versions.back().version); +} + +Status Column::capture_version(uint64_t version, vector<ColumnDelta*>* deltas, + uint64_t* real_version) const { + uint64_t base_version = _versions[_base_idx].version; + *real_version = base_version; + if (version < base_version) { + uint64_t oldest = _versions[0].version; + if (version < oldest) { + return Status::NotFound( + StringPrintf("version %zu(oldest=%zu) deleted", version, oldest)); + } + DCHECK_GT(_base_idx, 0); + for (ssize_t i = static_cast<ssize_t>(_base_idx) - 1; i >= 0; i--) { + uint64_t v = _versions[i].version; + if (v >= version) { + DCHECK(_versions[i].delta); + *real_version = v; + deltas->emplace_back(_versions[i].delta.get()); + if (v == version) { + break; + } + } else { + break; + } + } + } else if (version > base_version) { + size_t vsize = _versions.size(); + for (size_t i = _base_idx + 1; i < vsize; i++) { + uint64_t v = _versions[i].version; + if (v <= version) { + DCHECK(_versions[i].delta); + *real_version = v; + deltas->emplace_back(_versions[i].delta.get()); + if (v == version) { + break; + } + } else { + break; + } + } + } + return Status::OK(); +} + +void Column::capture_latest(vector<ColumnDelta*>* deltas) const { + deltas->reserve(_versions.size() - _base_idx - 1); + for (size_t i = _base_idx + 1; i < _versions.size(); i++) { + deltas->emplace_back(_versions[i].delta.get()); + } +} + +Status Column::read(uint64_t version, std::unique_ptr<ColumnReader>* reader) { + return Status::NotSupported("not supported"); +} + +Status Column::write(std::unique_ptr<ColumnWriter>* writer) { + return Status::NotSupported("not supported"); +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/column.h b/be/src/olap/memory/column.h new file mode 100644 index 0000000..1c59d1d --- /dev/null +++ b/be/src/olap/memory/column.h @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/column_block.h" +#include "olap/memory/column_delta.h" +#include "olap/memory/common.h" +#include "olap/memory/schema.h" + +namespace doris { +namespace memory { + +class ColumnReader; +class ColumnWriter; + +// Column store all the data of a column, including base and deltas. +// It supports single-writer multi-reader concurrency. +// It's properties are all immutable except _base and _versions. +// _base and _versions use std::vector, which is basically thread-safe +// in-practice for single-writer/multi-reader access, if there isn't +// any over-capacity realloc or delta compaction/GC caused data change. +// When these situations occur, we do a copy-on-write. +// +// TODO: add column read&writer +class Column : public RefCountedThreadSafe<Column> { +public: + static const uint32_t BLOCK_SIZE = 1 << 16; + static const uint32_t BLOCK_MASK = 0xffff; + // base vector capacity min grow step size + static const uint32_t BASE_CAPACITY_MIN_STEP_SIZE = 8; + // base vector capacity max grow step size + static const uint32_t BASE_CAPACITY_MAX_STEP_SIZE = 8; + // version vector capacity grow step size + static const uint32_t VERSION_CAPACITY_STEP_SIZE = 8; + + // create a Column which provided column schema, underlying storage_type and initial version + Column(const ColumnSchema& cs, ColumnType storage_type, uint64_t version); + + // copy-on-write a new Column with new capacity + Column(const Column& rhs, size_t new_base_capacity, size_t new_version_capacity); + + // get column schema + const ColumnSchema& schema() { return _cs; } + + // get memory usage in bytes + size_t memory() const; + + string debug_string() const; + + // read this Column at a specific version, get a reader for this Column + // support multiple concurrent readers + Status read(uint64_t version, std::unique_ptr<ColumnReader>* reader); + + // write this Column, get a writer for this Column + // caller needs to make sure there is only one or no writer exists at any time + Status write(std::unique_ptr<ColumnWriter>* writer); + +private: + ColumnSchema _cs; + // For some types the storage_type may be different from actual type from schema. + // For example, string stored in dictionary, so column_block store a integer id, + // and the storage type may change as the dictionary grows, e.g. from uint8 to uint16 + ColumnType _storage_type; + // base's position at _versions vector + size_t _base_idx; + // base data, a vector of ColumnBlocks + vector<scoped_refptr<ColumnBlock>> _base; + struct VersionInfo { + VersionInfo() = default; + explicit VersionInfo(uint64_t version) : version(version) {} + uint64_t version = 0; + // null if it's base + scoped_refptr<ColumnDelta> delta; + }; + // version vector + vector<VersionInfo> _versions; + + // get related deltas of a specified version, and it's corresponding real_version + // For example: + // if we have [1,3,5,7,13,16,20,30] in versions array, and base is at version 13 + // capture version 24 will get deltas=[13, 16, 20], and real_version 20 + Status capture_version(uint64_t version, vector<ColumnDelta*>* deltas, + uint64_t* real_version) const; + + // get latest version's related delta + void capture_latest(vector<ColumnDelta*>* deltas) const; + + DISALLOW_COPY_AND_ASSIGN(Column); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/column_block.cpp b/be/src/olap/memory/column_block.cpp new file mode 100644 index 0000000..1d25fbc --- /dev/null +++ b/be/src/olap/memory/column_block.cpp @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/column_block.h" + +namespace doris { +namespace memory { + +size_t ColumnBlock::memory() const { + return _data.bsize() + _nulls.bsize(); +} + +Status ColumnBlock::alloc(size_t size, size_t esize) { + if (_data || _nulls) { + LOG(FATAL) << "reinit column page"; + } + RETURN_IF_ERROR(_data.alloc(size * esize)); + _data.set_zero(); + _size = size; + return Status::OK(); +} + +Status ColumnBlock::set_null(uint32_t idx) { + if (!_nulls) { + RETURN_IF_ERROR(_nulls.alloc(_size)); + _nulls.set_zero(); + } + _nulls.as<bool>()[idx] = true; + return Status::OK(); +} + +Status ColumnBlock::set_not_null(uint32_t idx) { + if (_nulls) { + _nulls.as<bool>()[idx] = false; + } + return Status::OK(); +} + +Status ColumnBlock::copy_to(ColumnBlock* dest, size_t size, size_t esize) { + if (size > dest->size()) { + return Status::InvalidArgument("ColumnBlock copy to a smaller ColumnBlock"); + } + if (dest->nulls()) { + if (nulls()) { + memcpy(dest->nulls().data(), nulls().data(), size); + } else { + memset(dest->nulls().data(), 0, size); + } + } else if (nulls()) { + RETURN_IF_ERROR(dest->nulls().alloc(dest->size())); + memcpy(dest->nulls().data(), nulls().data(), size); + } + return Status::OK(); +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/column_block.h b/be/src/olap/memory/column_block.h new file mode 100644 index 0000000..5dc0024 --- /dev/null +++ b/be/src/olap/memory/column_block.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/buffer.h" +#include "olap/memory/common.h" + +namespace doris { +namespace memory { + +// ColumnBlock stores one block of data for a Column +class ColumnBlock : public RefCountedThreadSafe<ColumnBlock> { +public: + ColumnBlock() = default; + + size_t memory() const; + + size_t size() const { return _size; } + + Buffer& data() { return _data; } + + Buffer& nulls() { return _nulls; } + + // Allocate memory for this block, with space for size elements and each + // element have esize byte size + Status alloc(size_t size, size_t esize); + + bool is_null(uint32_t idx) const { return _nulls && _nulls.as<bool>()[idx]; } + + Status set_null(uint32_t idx); + + Status set_not_null(uint32_t idx); + + // Copy the first size elements to dest ColumnBlock, each element has + // esize byte size + Status copy_to(ColumnBlock* dest, size_t size, size_t esize); + +private: + size_t _size = 0; + Buffer _nulls; + Buffer _data; + DISALLOW_COPY_AND_ASSIGN(ColumnBlock); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/column_delta.cpp b/be/src/olap/memory/column_delta.cpp new file mode 100644 index 0000000..22b3217 --- /dev/null +++ b/be/src/olap/memory/column_delta.cpp @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/column_delta.h" + +namespace doris { +namespace memory { + +size_t ColumnDelta::memory() const { + return _index->memory() + _nulls.bsize() + _data.bsize(); +} + +Status ColumnDelta::alloc(size_t nblock, size_t size, size_t esize, bool has_null) { + if (_data || _nulls) { + LOG(FATAL) << "reinit column delta"; + } + _index.reset(new DeltaIndex()); + _index->block_ends().resize(nblock, 0); + Status ret = _index->data().alloc(size * sizeof(uint16_t)); + if (!ret.ok()) { + return ret; + } + ret = _data.alloc(size * esize); + if (!ret.ok()) { + return ret; + } + if (has_null) { + ret = _nulls.alloc(size); + if (!ret.ok()) { + _data.clear(); + return Status::MemoryAllocFailed("init column delta nulls"); + } + _nulls.set_zero(); + } + _data.set_zero(); + _size = size; + return Status::OK(); +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/column_delta.h b/be/src/olap/memory/column_delta.h new file mode 100644 index 0000000..0db4ada --- /dev/null +++ b/be/src/olap/memory/column_delta.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/common.h" +#include "olap/memory/delta_index.h" + +namespace doris { +namespace memory { + +// ColumnDelta store a column's updates of a commit(version) +class ColumnDelta : public RefCountedThreadSafe<ColumnDelta> { +public: + ColumnDelta() = default; + + size_t memory() const; + + size_t size() const { return _size; } + + Buffer& nulls() { return _nulls; } + + Buffer& data() { return _data; } + + DeltaIndex* index() { return _index.get(); } + + bool contains_block(uint32_t bid) const { return _index->contains_block(bid); } + + uint32_t find_idx(uint32_t rid) { return _index->find_idx(rid); } + + Status alloc(size_t nblock, size_t size, size_t esize, bool has_null); + +private: + size_t _size = 0; + scoped_refptr<DeltaIndex> _index; + Buffer _nulls; + Buffer _data; + DISALLOW_COPY_AND_ASSIGN(ColumnDelta); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/common.h similarity index 62% copy from be/src/olap/memory/mem_tablet.h copy to be/src/olap/memory/common.h index 9fd1312..ab952b9 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/common.h @@ -15,25 +15,30 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ -#define DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ +#pragma once -#include "olap/base_tablet.h" +#include <memory> -namespace doris { +#include "common/logging.h" +#include "common/status.h" +#include "gutil/ref_counted.h" +#include "gutil/stringprintf.h" +#include "olap/olap_common.h" +#include "olap/olap_define.h" +#include "olap/types.h" -// Tablet class for memory-optimized storage engine. -// -// It stores all its data in-memory, and is designed for tables with -// frequent updates. -// -// TODO: This is just a skeleton, will add implementation in the future. -class MemTablet : public BaseTablet { +namespace doris { +namespace memory { -private: - DISALLOW_COPY_AND_ASSIGN(MemTablet); -}; +template <class T, class ST> +inline T padding(T v, ST pad) { + return (v + pad - 1) / pad * pad; +} -} /* namespace doris */ +template <class T, class ST> +inline size_t num_block(T v, ST bs) { + return (v + bs - 1) / bs; +} -#endif /* DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ */ +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/delta_index.cpp b/be/src/olap/memory/delta_index.cpp new file mode 100644 index 0000000..faae352 --- /dev/null +++ b/be/src/olap/memory/delta_index.cpp @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/delta_index.h" + +namespace doris { +namespace memory { + +size_t DeltaIndex::memory() const { + return _data.bsize() + _block_ends.size() * sizeof(uint32_t); +} + +uint32_t DeltaIndex::find_idx(uint32_t rid) { + if (!_data) { + return npos; + } + uint32_t bid = rid >> 16; + if (bid >= _block_ends.size()) { + return npos; + } + // TODO: use SIMD + uint32_t start = bid > 0 ? _block_ends[bid - 1] : 0; + uint32_t end = _block_ends[bid]; + if (start == end) { + return npos; + } + uint16_t* astart = _data.as<uint16_t>() + start; + uint16_t* aend = _data.as<uint16_t>() + end; + uint32_t bidx = rid & 0xffff; + uint16_t* pos = std::lower_bound(astart, aend, bidx); + if ((pos != aend) && (*pos == bidx)) { + return pos - _data.as<uint16_t>(); + } else { + return npos; + } +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/delta_index.h b/be/src/olap/memory/delta_index.h new file mode 100644 index 0000000..00b6728 --- /dev/null +++ b/be/src/olap/memory/delta_index.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "olap/memory/buffer.h" +#include "olap/memory/common.h" + +namespace doris { +namespace memory { + +// DeltaIndex store all the updated rows' id(rowids) for a ColumnDelta. +// Rowids are sorted and divided into blocks, each 64K rowid space is a +// block. Since each block only have 64K id space, it can be store as uint16_t +// rather than uint32_t to save memory. +class DeltaIndex : public RefCountedThreadSafe<DeltaIndex> { +public: + static const uint32_t npos = 0xffffffffu; + + DeltaIndex() = default; + + // get memory consumption + size_t memory() const; + + // find rowid(rid) in the index, + // return index position if found, else return npos + uint32_t find_idx(uint32_t rid); + + // get a block's index position range as [start, end) + void block_range(uint32_t bid, uint32_t* start, uint32_t* end) const { + if (bid < _block_ends.size()) { + *start = bid > 0 ? _block_ends[bid - 1] : 0; + *end = _block_ends[bid]; + } else { + *start = 0; + *end = 0; + } + } + + // Return true if this index has any rowid belonging to this block + bool contains_block(uint32_t bid) const { + if (bid < _block_ends.size()) { + return (bid > 0 ? _block_ends[bid - 1] : 0) < _block_ends[bid]; + } + return false; + } + + Buffer& data() { return _data; } + const Buffer& data() const { return _data; } + vector<uint32_t>& block_ends() { return _block_ends; } + const vector<uint32_t>& block_ends() const { return _block_ends; } + +private: + vector<uint32_t> _block_ends; + Buffer _data; + DISALLOW_COPY_AND_ASSIGN(DeltaIndex); +}; + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/hash_index.cpp b/be/src/olap/memory/hash_index.cpp index b314716..a7991f2 100644 --- a/be/src/olap/memory/hash_index.cpp +++ b/be/src/olap/memory/hash_index.cpp @@ -28,6 +28,7 @@ #include "gutil/stringprintf.h" namespace doris { +namespace memory { struct alignas(64) HashChunk { static const uint32_t CAPACITY = 12; @@ -166,4 +167,5 @@ const std::string HashIndex::dump() const { size() / (_num_chunks * 12.0f)); } +} // namespace memory } // namespace doris diff --git a/be/src/olap/memory/hash_index.h b/be/src/olap/memory/hash_index.h index d962a14..35d79f9 100644 --- a/be/src/olap/memory/hash_index.h +++ b/be/src/olap/memory/hash_index.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_ -#define DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_ +#pragma once #include <stdint.h> @@ -27,6 +26,7 @@ #include "gutil/ref_counted.h" namespace doris { +namespace memory { struct HashChunk; @@ -103,6 +103,5 @@ private: HashChunk* _chunks; }; -} /* namespace doris */ - -#endif /* DORIS_BE_SRC_OLAP_MEMORY_HASH_INDEX_H_ */ +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/mem_tablet.cpp b/be/src/olap/memory/mem_tablet.cpp index 9137c15..5c2edd8 100644 --- a/be/src/olap/memory/mem_tablet.cpp +++ b/be/src/olap/memory/mem_tablet.cpp @@ -18,5 +18,12 @@ #include "olap/memory/mem_tablet.h" namespace doris { +namespace memory { -} /* namespace doris */ +MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) + : BaseTablet(tablet_meta, data_dir) {} + +MemTablet::~MemTablet() {} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h index 9fd1312..7efc945 100644 --- a/be/src/olap/memory/mem_tablet.h +++ b/be/src/olap/memory/mem_tablet.h @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -#ifndef DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ -#define DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ +#pragma once #include "olap/base_tablet.h" namespace doris { +namespace memory { // Tablet class for memory-optimized storage engine. // @@ -29,11 +29,13 @@ namespace doris { // // TODO: This is just a skeleton, will add implementation in the future. class MemTablet : public BaseTablet { +public: + MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir); + virtual ~MemTablet(); private: DISALLOW_COPY_AND_ASSIGN(MemTablet); }; -} /* namespace doris */ - -#endif /* DORIS_BE_SRC_MEMORY_MEM_TABLET_H_ */ +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/schema.cpp b/be/src/olap/memory/schema.cpp new file mode 100644 index 0000000..cf9b3d3 --- /dev/null +++ b/be/src/olap/memory/schema.cpp @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/schema.h" + +namespace doris { +namespace memory { + +ColumnSchema::ColumnSchema(const TabletColumn& tcolumn) : _tcolumn(tcolumn) {} + +ColumnSchema::ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool nullable, + bool is_key) { + ColumnPB cpb; + cpb.set_unique_id(cid); + cpb.set_name(name); + cpb.set_type(TabletColumn::get_string_by_field_type(type)); + cpb.set_is_nullable(nullable); + cpb.set_is_key(is_key); + _tcolumn.init_from_pb(cpb); +} + +std::string ColumnSchema::type_name() const { + return TabletColumn::get_string_by_field_type(_tcolumn.type()); +} + +std::string ColumnSchema::debug_string() const { + return StringPrintf("cid=%d %s %s%s%s", cid(), name().c_str(), type_name().c_str(), + is_nullable() ? " nullable" : "", is_key() ? " key" : ""); +} + +////////////////////////////////////////////////////////////////////////////// + +Schema::Schema(const TabletSchema& tschema) : _tschema(tschema) { + _cid_size = 1; + _cid_to_col.resize(_cid_size, nullptr); + for (size_t i = 0; i < num_columns(); i++) { + const ColumnSchema* cs = get(i); + _cid_size = std::max(_cid_size, cs->cid() + 1); + _cid_to_col.resize(_cid_size, nullptr); + _cid_to_col[cs->cid()] = cs; + _name_to_col[cs->name()] = cs; + } +} + +std::string Schema::debug_string() const { + std::string ret("("); + for (size_t i = 0; i < num_columns(); i++) { + const ColumnSchema* cs = get(i); + if (i > 0) { + ret.append(", "); + } + ret.append(cs->debug_string()); + } + ret.append(")"); + return ret; +} + +uint32_t Schema::cid_size() const { + return _cid_size; +} + +const ColumnSchema* Schema::get(size_t idx) const { + if (idx < num_columns()) { + // TODO: this is a hack, improve this in the future + return reinterpret_cast<const ColumnSchema*>(&_tschema.columns()[idx]); + } + return nullptr; +} + +const ColumnSchema* Schema::get_by_name(const string& name) const { + auto itr = _name_to_col.find(name); + if (itr == _name_to_col.end()) { + return nullptr; + } else { + return itr->second; + } +} + +const ColumnSchema* Schema::get_by_cid(uint32_t cid) const { + if (cid < _cid_to_col.size()) { + return _cid_to_col[cid]; + } else { + return nullptr; + } +} + +} // namespace memory +} // namespace doris diff --git a/be/src/olap/memory/schema.h b/be/src/olap/memory/schema.h new file mode 100644 index 0000000..ebe3571 --- /dev/null +++ b/be/src/olap/memory/schema.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/memory/common.h" +#include "olap/tablet_schema.h" + +namespace doris { +namespace memory { + +// This file contains type and schema adaptors +// from olap's type and schema to memory engine's type and schema + +// Memory engine's column type, just use FieldType for now +typedef FieldType ColumnType; + +// Memory engine's column schema, simple wrapper of TabletColumn. +// TODO: Add more properties and methods later +class ColumnSchema { +public: + explicit ColumnSchema(const TabletColumn& tcolumn); + ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool nullable, bool is_key); + inline uint32_t cid() const { return static_cast<uint32_t>(_tcolumn.unique_id()); } + inline std::string name() const { return _tcolumn.name(); } + inline ColumnType type() const { return _tcolumn.type(); } + inline bool is_nullable() const { return _tcolumn.is_nullable(); } + inline bool is_key() const { return _tcolumn.is_key(); } + + std::string type_name() const; + std::string debug_string() const; + +private: + TabletColumn _tcolumn; +}; + +// Memory engine's tablet schema, simple wrapper of TabletSchema. +// Schema have some differences comparing to original TabletSchema: +// 1. there is a hidden delete_flag column (with special cid=0) to mark +// deleted rows +// 2. in the future, there may be a special compound primary key column +// if primary-key has multiple columns +// TODO: Add more properties and methods later +class Schema { +public: + explicit Schema(const TabletSchema& tschema); + std::string debug_string() const; + inline size_t num_columns() const { return _tschema.num_columns(); } + inline size_t num_key_columns() const { return _tschema.num_key_columns(); } + + const ColumnSchema* get(size_t idx) const; + + const ColumnSchema* get_by_name(const string& name) const; + + uint32_t cid_size() const; + const ColumnSchema* get_by_cid(uint32_t cid) const; + +private: + TabletSchema _tschema; + uint32_t _cid_size; + std::unordered_map<string, const ColumnSchema*> _name_to_col; + vector<const ColumnSchema*> _cid_to_col; +}; + +} // namespace memory +} // namespace doris diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt index c3671a2..083b232 100644 --- a/be/test/olap/CMakeLists.txt +++ b/be/test/olap/CMakeLists.txt @@ -83,3 +83,5 @@ ADD_BE_TEST(selection_vector_test) ADD_BE_TEST(options_test) ADD_BE_TEST(fs/file_block_manager_test) ADD_BE_TEST(memory/hash_index_test) +ADD_BE_TEST(memory/column_delta_test) +ADD_BE_TEST(memory/schema_test) diff --git a/be/test/olap/memory/column_delta_test.cpp b/be/test/olap/memory/column_delta_test.cpp new file mode 100644 index 0000000..9a31ae8 --- /dev/null +++ b/be/test/olap/memory/column_delta_test.cpp @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/column_delta.h" + +#include <gtest/gtest.h> + +#include <map> +#include <vector> + +#include "olap/memory/column.h" + +namespace doris { +namespace memory { + +TEST(ColumnDelta, Index) { + const int BaseSize = 256001; + const int NumUpdate = 10000; + srand(1); + scoped_refptr<ColumnDelta> delta(new ColumnDelta()); + std::map<uint32_t, uint32_t> updates; + for (int i = 0; i < NumUpdate; i++) { + uint32_t idx = rand() % BaseSize; + updates[idx] = rand(); + } + size_t nblock = num_block(BaseSize, Column::BLOCK_SIZE); + ASSERT_TRUE(delta->alloc(nblock, updates.size(), sizeof(uint32_t), false).ok()); + DeltaIndex* index = delta->index(); + vector<uint32_t>& block_ends = index->block_ends(); + Buffer& idxdata = index->_data; + Buffer& data = delta->data(); + uint32_t cidx = 0; + uint32_t curbid = 0; + for (auto& e : updates) { + uint32_t rid = e.first; + uint32_t bid = rid >> 16; + while (curbid < bid) { + block_ends[curbid] = cidx; + curbid++; + } + idxdata.as<uint16_t>()[cidx] = rid & 0xffff; + data.as<uint32_t>()[cidx] = e.second; + cidx++; + } + while (curbid < nblock) { + block_ends[curbid] = cidx; + curbid++; + } + for (int i = 0; i < BaseSize; i++) { + uint32_t idx = delta->find_idx(i); + auto itr = updates.find(i); + if (itr == updates.end()) { + EXPECT_TRUE(idx == DeltaIndex::npos); + } else { + uint32_t v = delta->data().as<uint32_t>()[idx]; + EXPECT_EQ(v, itr->second); + } + } +} + +} // namespace memory +} // namespace doris + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/olap/memory/hash_index_test.cpp b/be/test/olap/memory/hash_index_test.cpp index 9ccbd04..94f3689 100644 --- a/be/test/olap/memory/hash_index_test.cpp +++ b/be/test/olap/memory/hash_index_test.cpp @@ -24,6 +24,7 @@ #include "gutil/hash/builtin_type_hash.h" namespace doris { +namespace memory { inline uint64_t HashCode(size_t v) { return Hash64NumWithSeed(v, 0); @@ -103,6 +104,7 @@ TEST(HashIndex, add) { LOG(INFO) << hi.dump(); } +} // namespace memory } // namespace doris int main(int argc, char** argv) { diff --git a/be/test/olap/memory/schema_test.cpp b/be/test/olap/memory/schema_test.cpp new file mode 100644 index 0000000..8772288 --- /dev/null +++ b/be/test/olap/memory/schema_test.cpp @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/memory/schema.h" + +#include <gtest/gtest.h> + +#include <vector> + +namespace doris { +namespace memory { + +TEST(ColumnSchema, create) { + ColumnSchema cs(1, "uid", ColumnType::OLAP_FIELD_TYPE_TINYINT, false, true); + EXPECT_EQ(1, cs.cid()); + EXPECT_EQ(std::string("uid"), cs.name()); + EXPECT_FALSE(cs.is_nullable()); + EXPECT_TRUE(cs.is_key()); +} + +TEST(Schema, create) { + TabletSchemaPB tspb; + auto cpb = tspb.add_column(); + cpb->set_unique_id(1); + cpb->set_name("uid"); + cpb->set_type(TabletColumn::get_string_by_field_type(FieldType::OLAP_FIELD_TYPE_INT)); + cpb->set_is_nullable(false); + cpb->set_is_key(true); + auto cpb2 = tspb.add_column(); + cpb2->set_unique_id(2); + cpb2->set_type(TabletColumn::get_string_by_field_type(FieldType::OLAP_FIELD_TYPE_INT)); + cpb2->set_name("city"); + cpb2->set_is_nullable(true); + cpb2->set_is_key(false); + tspb.set_keys_type(KeysType::UNIQUE_KEYS); + tspb.set_next_column_unique_id(3); + tspb.set_num_short_key_columns(1); + tspb.set_is_in_memory(false); + TabletSchema ts; + ts.init_from_pb(tspb); + Schema schema(ts); + EXPECT_EQ(schema.cid_size(), 3); + EXPECT_EQ(schema.get_by_name("uid")->name(), std::string("uid")); + EXPECT_EQ(schema.get_by_cid(1)->name(), std::string("uid")); +} + +} // namespace memory +} // namespace doris + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org