This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7524c5e  [Memory Engine] Add MemSubTablet, MemTablet, WriteTx, 
PartialRowBatch (#3637)
7524c5e is described below

commit 7524c5ef63becb184583dae4111a19bcb0b43e22
Author: Binglin Chang <decst...@gmail.com>
AuthorDate: Sat May 30 10:33:10 2020 +0800

    [Memory Engine] Add MemSubTablet, MemTablet, WriteTx, PartialRowBatch 
(#3637)
---
 be/src/olap/base_tablet.cpp                        |  13 +-
 be/src/olap/base_tablet.h                          |  14 +-
 be/src/olap/memory/CMakeLists.txt                  |   3 +
 be/src/olap/memory/common.h                        |   1 +
 be/src/olap/memory/mem_sub_tablet.cpp              | 246 ++++++++++++++++++
 be/src/olap/memory/mem_sub_tablet.h                | 120 +++++++++
 be/src/olap/memory/mem_tablet.cpp                  |  35 ++-
 be/src/olap/memory/mem_tablet.h                    |  47 +++-
 be/src/olap/memory/partial_row_batch.cpp           | 274 +++++++++++++++++++++
 be/src/olap/memory/partial_row_batch.h             | 172 +++++++++++++
 be/src/olap/memory/schema.cpp                      |  72 ++++++
 be/src/olap/memory/schema.h                        |  38 ++-
 .../olap/memory/{mem_tablet.cpp => write_txn.cpp}  |  16 +-
 be/src/olap/memory/{mem_tablet.h => write_txn.h}   |  35 ++-
 be/src/olap/tablet.cpp                             |   9 -
 be/src/olap/tablet.h                               |   3 +-
 be/src/util/time.h                                 |   4 +
 be/test/olap/CMakeLists.txt                        |   1 +
 be/test/olap/memory/partial_row_batch_test.cpp     | 111 +++++++++
 be/test/olap/memory/schema_test.cpp                |  22 +-
 20 files changed, 1199 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 41aa93a..368f5ed 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "base_tablet.h"
+#include "olap/base_tablet.h"
+#include "util/path_util.h"
+#include "olap/data_dir.h"
 
 namespace doris {
 
@@ -24,6 +26,7 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, 
DataDir* data_dir) :
         _tablet_meta(tablet_meta),
         _schema(tablet_meta->tablet_schema()),
         _data_dir(data_dir) {
+    _gen_tablet_path();
 }
 
 BaseTablet::~BaseTablet() {
@@ -40,4 +43,12 @@ OLAPStatus BaseTablet::set_tablet_state(TabletState state) {
     return OLAP_SUCCESS;
 }
 
+void BaseTablet::_gen_tablet_path() {
+    std::string path = _data_dir->path() + DATA_PREFIX;
+    path = path_util::join_path_segments(path, 
std::to_string(_tablet_meta->shard_id()));
+    path = path_util::join_path_segments(path, 
std::to_string(_tablet_meta->tablet_id()));
+    path = path_util::join_path_segments(path, 
std::to_string(_tablet_meta->schema_hash()));
+    _tablet_path = path;
+}
+
 } /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 34020eb..f3b0c2d 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -19,12 +19,15 @@
 #define DORIS_BE_SRC_OLAP_BASE_TABLET_H
 
 #include <memory>
+
 #include "olap/olap_define.h"
 #include "olap/tablet_meta.h"
 #include "olap/utils.h"
 
 namespace doris {
 
+class DataDir;
+
 // Base class for all tablet classes, currently only olap/Tablet and
 // olap/memory/MemTablet.
 // The fields and methods in this class is not final, it will change as memory
@@ -57,10 +60,13 @@ public:
     inline void set_creation_time(int64_t creation_time);
     inline bool equal(int64_t tablet_id, int32_t schema_hash);
 
-    // propreties encapsulated in TabletSchema
+    // properties encapsulated in TabletSchema
     inline const TabletSchema& tablet_schema() const;
 
 protected:
+    void _gen_tablet_path();
+
+protected:
     TabletState _state;
     TabletMetaSharedPtr _tablet_meta;
     TabletSchema _schema;
@@ -72,7 +78,6 @@ private:
     DISALLOW_COPY_AND_ASSIGN(BaseTablet);
 };
 
-
 inline DataDir* BaseTablet::data_dir() const {
     return _data_dir;
 }
@@ -99,9 +104,8 @@ inline int64_t BaseTablet::table_id() const {
 
 inline const std::string BaseTablet::full_name() const {
     std::stringstream ss;
-    ss << _tablet_meta->tablet_id()
-       << "." << _tablet_meta->schema_hash()
-       << "." << _tablet_meta->tablet_uid().to_string();
+    ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << 
"."
+       << _tablet_meta->tablet_uid().to_string();
     return ss.str();
 }
 
diff --git a/be/src/olap/memory/CMakeLists.txt 
b/be/src/olap/memory/CMakeLists.txt
index 9de9095..b552dfe 100644
--- a/be/src/olap/memory/CMakeLists.txt
+++ b/be/src/olap/memory/CMakeLists.txt
@@ -29,5 +29,8 @@ add_library(Memory STATIC
     delta_index.cpp
     hash_index.cpp
     mem_tablet.cpp
+    mem_sub_tablet.cpp
+    partial_row_batch.cpp
     schema.cpp
+    write_txn.cpp
 )
diff --git a/be/src/olap/memory/common.h b/be/src/olap/memory/common.h
index ab952b9..2185dd2 100644
--- a/be/src/olap/memory/common.h
+++ b/be/src/olap/memory/common.h
@@ -26,6 +26,7 @@
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/types.h"
+#include "util/time.h"
 
 namespace doris {
 namespace memory {
diff --git a/be/src/olap/memory/mem_sub_tablet.cpp 
b/be/src/olap/memory/mem_sub_tablet.cpp
new file mode 100644
index 0000000..8bbdf5f
--- /dev/null
+++ b/be/src/olap/memory/mem_sub_tablet.cpp
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/mem_sub_tablet.h"
+
+#include "olap/memory/column.h"
+#include "olap/memory/column_reader.h"
+#include "olap/memory/column_writer.h"
+#include "olap/memory/hash_index.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+Status MemSubTablet::create(uint64_t version, const Schema& schema,
+                            std::unique_ptr<MemSubTablet>* ret) {
+    std::unique_ptr<MemSubTablet> tmp(new MemSubTablet());
+    tmp->_versions.reserve(64);
+    tmp->_versions.emplace_back(version, 0);
+    tmp->_columns.resize(schema.cid_size());
+    for (size_t i = 0; i < schema.num_columns(); i++) {
+        // TODO: support storage_type != c.type
+        auto& c = *schema.get(i);
+        if (!supported(c.type())) {
+            return Status::NotSupported("column type not supported");
+        }
+        tmp->_columns[c.cid()].reset(new Column(c, c.type(), version));
+    }
+    tmp.swap(*ret);
+    return Status::OK();
+}
+
+MemSubTablet::MemSubTablet() : _index(new HashIndex(1 << 16)) {}
+
+MemSubTablet::~MemSubTablet() {}
+
+Status MemSubTablet::get_size(uint64_t version, size_t* size) const {
+    std::lock_guard<std::mutex> lg(_lock);
+    if (version == static_cast<uint64_t>(-1)) {
+        // get latest
+        *size = _versions.back().size;
+        return Status::OK();
+    }
+    if (_versions[0].version > version) {
+        return Status::NotFound("get_size failed, version too old");
+    }
+    for (size_t i = 1; i < _versions.size(); i++) {
+        if (_versions[i].version > version) {
+            *size = _versions[i - 1].size;
+            return Status::OK();
+        }
+    }
+    *size = _versions.back().size;
+    return Status::OK();
+}
+
+Status MemSubTablet::read_column(uint64_t version, uint32_t cid,
+                                 std::unique_ptr<ColumnReader>* reader) {
+    scoped_refptr<Column> cl;
+    {
+        std::lock_guard<std::mutex> lg(_lock);
+        if (cid < _columns.size()) {
+            cl = _columns[cid];
+        }
+    }
+    if (cl.get() != nullptr) {
+        return Status::NotFound("column not found");
+    }
+    return cl->create_reader(version, reader);
+}
+
+Status MemSubTablet::get_index_to_read(scoped_refptr<HashIndex>* index) {
+    *index = _index;
+    return Status::OK();
+}
+
+Status MemSubTablet::begin_write(scoped_refptr<Schema>* schema) {
+    if (_schema != nullptr) {
+        return Status::InternalError("Another write is in-progress or error 
occurred");
+    }
+    _schema = *schema;
+    _row_size = latest_size();
+    _write_index = _index;
+    _writers.clear();
+    _writers.resize(_columns.size());
+    // precache key columns
+    for (size_t i = 0; i < _schema->num_key_columns(); i++) {
+        uint32_t cid = _schema->get(i)->cid();
+        if (_writers[cid] != nullptr) {
+            RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+        }
+    }
+    _temp_hash_entries.reserve(8);
+
+    // setup stats
+    _write_start = GetMonoTimeSecondsAsDouble();
+    _num_insert = 0;
+    _num_update = 0;
+    _num_update_cell = 0;
+    return Status::OK();
+}
+
+Status MemSubTablet::apply_partial_row_batch(PartialRowBatch* batch) {
+    while (true) {
+        bool has_row = false;
+        RETURN_IF_ERROR(batch->next_row(&has_row));
+        if (!has_row) {
+            break;
+        }
+        DCHECK_GE(batch->cur_row_cell_size(), 1);
+        const ColumnSchema* dsc;
+        const void* key;
+        // get key column and find in hash index
+        // TODO: support multi-column row key
+        batch->cur_row_get_cell(0, &dsc, &key);
+        ColumnWriter* keyw = _writers[1].get();
+        // find candidate rowids, and check equality
+        uint64_t hashcode = keyw->hashcode(key, 0);
+        _temp_hash_entries.clear();
+        uint32_t newslot = _write_index->find(hashcode, &_temp_hash_entries);
+        uint32_t rid = -1;
+        for (size_t i = 0; i < _temp_hash_entries.size(); i++) {
+            uint32_t test_rid = _temp_hash_entries[i];
+            if (keyw->equals(test_rid, key, 0)) {
+                rid = test_rid;
+                break;
+            }
+        }
+        // if rowkey not found, do insertion/append
+        if (rid == -1) {
+            rid = _row_size;
+            // add all columns
+            //DLOG(INFO) << StringPrintf"insert rid=%u", rid);
+            for (size_t i = 0; i < batch->cur_row_cell_size(); i++) {
+                const void* data;
+                RETURN_IF_ERROR(batch->cur_row_get_cell(i, &dsc, &data));
+                uint32_t cid = dsc->cid();
+                if (_writers[cid] == nullptr) {
+                    
RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+                }
+                RETURN_IF_ERROR(_writers[cid]->insert(rid, data));
+            }
+            _write_index->set(newslot, hashcode, rid);
+            _row_size++;
+            if (_write_index->need_rebuild()) {
+                scoped_refptr<HashIndex> new_index;
+                // TODO: trace memory usage
+                size_t new_capacity = _row_size * 2;
+                while (true) {
+                    new_index = rebuild_hash_index(new_capacity);
+                    if (new_index.get() != nullptr) {
+                        break;
+                    } else {
+                        new_capacity += 1 << 16;
+                    }
+                }
+                _write_index = new_index;
+            }
+            _num_insert++;
+        } else {
+            // rowkey found, do update
+            // add non-key columns
+            for (size_t i = 1; i < batch->cur_row_cell_size(); i++) {
+                const void* data;
+                RETURN_IF_ERROR(batch->cur_row_get_cell(i, &dsc, &data));
+                uint32_t cid = dsc->cid();
+                if (cid > _schema->num_key_columns()) {
+                    if (_writers[cid] == nullptr) {
+                        
RETURN_IF_ERROR(_columns[cid]->create_writer(&_writers[cid]));
+                    }
+                    RETURN_IF_ERROR(_writers[cid]->update(rid, data));
+                }
+            }
+            _num_update++;
+            _num_update_cell += batch->cur_row_cell_size() - 1;
+        }
+    }
+    return Status::OK();
+}
+
+Status MemSubTablet::commit_write(uint64_t version) {
+    for (size_t cid = 0; cid < _writers.size(); cid++) {
+        if (_writers[cid] != nullptr) {
+            // Should not fail in normal cases, fatal error if commit failed
+            RETURN_IF_ERROR(_writers[cid]->finalize(version));
+        }
+    }
+    {
+        std::lock_guard<std::mutex> lg(_lock);
+        if (_index != _write_index) {
+            _index = _write_index;
+        }
+        for (size_t cid = 0; cid < _writers.size(); cid++) {
+            if (_writers[cid] != nullptr) {
+                // Should not fail in normal cases, fatal error if commit 
failed
+                RETURN_IF_ERROR(_writers[cid]->get_new_column(&_columns[cid]));
+            }
+        }
+        _versions.emplace_back(version, _row_size);
+    }
+    _write_index.reset();
+    _writers.clear();
+    _schema = nullptr;
+    LOG(INFO) << StringPrintf("commit writetxn(insert=%zu update=%zu 
update_cell=%zu) %.3lfs",
+                              _num_insert, _num_update, _num_update_cell,
+                              GetMonoTimeSecondsAsDouble() - _write_start);
+    return Status::OK();
+}
+
+scoped_refptr<HashIndex> MemSubTablet::rebuild_hash_index(size_t new_capacity) 
{
+    double t0 = GetMonoTimeSecondsAsDouble();
+    ColumnWriter* keyw = _writers[1].get();
+    scoped_refptr<HashIndex> hi(new HashIndex(new_capacity));
+    for (size_t i = 0; i < _row_size; i++) {
+        const void* data = keyw->get(i);
+        DCHECK(data);
+        uint64_t hashcode = keyw->hashcode(data, 0);
+        if (!hi->add(hashcode, i)) {
+            double t1 = GetMonoTimeSecondsAsDouble();
+            LOG(INFO) << StringPrintf("Rebuild hash index %zu failed time: 
%.3lfs, expand",
+                                      new_capacity, t1 - t0);
+            return scoped_refptr<HashIndex>();
+        }
+    }
+    double t1 = GetMonoTimeSecondsAsDouble();
+    LOG(INFO) << StringPrintf("Rebuild hash index %zu time: %.3lfs", 
new_capacity, t1 - t0);
+    return hi;
+}
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/mem_sub_tablet.h 
b/be/src/olap/memory/mem_sub_tablet.h
new file mode 100644
index 0000000..68ddc3d
--- /dev/null
+++ b/be/src/olap/memory/mem_sub_tablet.h
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+class HashIndex;
+class ColumnReader;
+class PartialRowBatch;
+class Column;
+class ColumnWriter;
+
+// A MemTablet can contain multiple MemSubTablets (currently only one).
+// MemSubTablet hold a HashIndex and a collection of columns.
+// It supports single-writer multi-reader concurrently.
+//
+// Example read usage:
+// std::unique_ptr<ColumnReader> reader;
+// sub_tablet->read_column(version, cid, &reader);
+// // read(scan/get) cells using reader
+//
+// Example write usage:
+// WrintTxn* wtxn;
+// MemSubTablet* sub_tablet;
+// sub_tablet->begin_write(current_schema);
+// for (size_t i = 0; i < wtxn->batch_size(); i++) {
+//     auto batch = wtxn->get_batch(i);
+//     PartialRowReader reader(*batch);
+//     for (size_t j = 0; j < reader.size(); j++) {
+//         RETURN_IF_ERROR(reader.read(j));
+//         RETURN_IF_ERROR(_sub_tablet->apply_partial_row(reader));
+//     }
+// }
+// sub_tablet->commit_write(version);
+class MemSubTablet {
+public:
+    // Create a new empty MemSubTablet, with specified schema and initial 
version
+    static Status create(uint64_t version, const Schema& schema,
+                         std::unique_ptr<MemSubTablet>* ret);
+
+    // Destructor
+    ~MemSubTablet();
+
+    // Return number of rows of the latest version, including rows marked as 
delete
+    size_t latest_size() const { return _versions.back().size; }
+
+    // Return number of rows of the specified version, including rows marked 
as delete
+    Status get_size(uint64_t version, size_t* size) const;
+
+    // Read a column with specified by column id(cid) and version, return a 
column reader
+    Status read_column(uint64_t version, uint32_t cid, 
std::unique_ptr<ColumnReader>* reader);
+
+    // Get a hash index read reference to read
+    Status get_index_to_read(scoped_refptr<HashIndex>* index);
+
+    // Start a exclusive write batch
+    // Note: caller should make sure schema is valid during write
+    Status begin_write(scoped_refptr<Schema>* schema);
+
+    // Apply a partial row to this MemSubTablet
+    Status apply_partial_row_batch(PartialRowBatch* batch);
+
+    // Finalize the whole write batch, with specified version
+    Status commit_write(uint64_t version);
+
+private:
+    MemSubTablet();
+    scoped_refptr<HashIndex> rebuild_hash_index(size_t new_capacity);
+
+    mutable std::mutex _lock;
+    scoped_refptr<HashIndex> _index;
+    struct VersionInfo {
+        VersionInfo(uint64_t version, uint64_t size) : version(version), 
size(size) {}
+        uint64_t version = 0;
+        uint64_t size = 0;
+    };
+    std::vector<VersionInfo> _versions;
+    // Map from cid to column
+    std::vector<scoped_refptr<Column>> _columns;
+
+    // Temporary write state variables
+    scoped_refptr<Schema> _schema;
+    size_t _row_size = 0;
+    // If a copy-on-write is performed on HashIndex, this variable holds
+    // the new reference, otherwise it holds the same reference as _index
+    scoped_refptr<HashIndex> _write_index;
+    // Map from cid to current writers
+    std::vector<std::unique_ptr<ColumnWriter>> _writers;
+    // Temporary variable to reuse hash entry vector
+    std::vector<uint32_t> _temp_hash_entries;
+    // Write stats
+    double _write_start = 0;
+    size_t _num_insert = 0;
+    size_t _num_update = 0;
+    size_t _num_update_cell = 0;
+
+    DISALLOW_COPY_AND_ASSIGN(MemSubTablet);
+};
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.cpp 
b/be/src/olap/memory/mem_tablet.cpp
index 5c2edd8..a75e30b 100644
--- a/be/src/olap/memory/mem_tablet.cpp
+++ b/be/src/olap/memory/mem_tablet.cpp
@@ -17,13 +17,46 @@
 
 #include "olap/memory/mem_tablet.h"
 
+#include "olap/memory/mem_sub_tablet.h"
+#include "olap/memory/write_txn.h"
+
 namespace doris {
 namespace memory {
 
 MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
-        : BaseTablet(tablet_meta, data_dir) {}
+        : BaseTablet(tablet_meta, data_dir) {
+    _mem_schema.reset(new Schema(_schema));
+}
 
 MemTablet::~MemTablet() {}
 
+std::shared_ptr<MemTablet> 
MemTablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
+                                                              DataDir* 
data_dir) {
+    return std::make_shared<MemTablet>(tablet_meta, data_dir);
+}
+
+Status MemTablet::init() {
+    return MemSubTablet::create(0, *_mem_schema.get(), &_sub_tablet);
+}
+
+Status MemTablet::scan(std::unique_ptr<ScanSpec>&& spec, 
std::unique_ptr<MemTabletScan>* scan) {
+    return Status::NotSupported("scan not supported");
+}
+
+Status MemTablet::create_write_txn(std::unique_ptr<WriteTxn>* wtxn) {
+    wtxn->reset(new WriteTxn(&_mem_schema));
+    return Status::OK();
+}
+
+Status MemTablet::commit_write_txn(WriteTxn* wtxn, uint64_t version) {
+    std::lock_guard<std::mutex> lg(_write_lock);
+    RETURN_IF_ERROR(_sub_tablet->begin_write(&_mem_schema));
+    for (size_t i = 0; i < wtxn->batch_size(); i++) {
+        auto batch = wtxn->get_batch(i);
+        RETURN_IF_ERROR(_sub_tablet->apply_partial_row_batch(batch));
+    }
+    return _sub_tablet->commit_write(version);
+}
+
 } // namespace memory
 } // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/mem_tablet.h
index 7efc945..cfc8c3c 100644
--- a/be/src/olap/memory/mem_tablet.h
+++ b/be/src/olap/memory/mem_tablet.h
@@ -18,22 +18,67 @@
 #pragma once
 
 #include "olap/base_tablet.h"
+#include "olap/memory/schema.h"
 
 namespace doris {
 namespace memory {
 
+class MemSubTablet;
+class ScanSpec;
+class MemTabletScan;
+class WriteTxn;
+
 // Tablet class for memory-optimized storage engine.
 //
 // It stores all its data in-memory, and is designed for tables with
 // frequent updates.
 //
-// TODO: This is just a skeleton, will add implementation in the future.
+// By design, MemTablet stores all the schema versions together inside a single
+// MemTablet, while olap/Tablet generate a new Tablet after schema change. so 
their
+// behaviors are not compatible, we will address this issue latter after 
adding schema
+// change support, currently MemTablet does not support schema change(only 
have single
+// version of schema).
+//
+// TODO: will add more functionality as project evolves.
 class MemTablet : public BaseTablet {
 public:
+    static std::shared_ptr<MemTablet> 
create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
+                                                              DataDir* 
data_dir = nullptr);
+
     MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
+
     virtual ~MemTablet();
 
+    // Initialize
+    Status init();
+
+    // Scan the tablet, return a MemTabletScan object scan, user can specify 
projections,
+    // predicates and aggregations using ScanSpec, currently only support full 
scan with
+    // projection.
+    //
+    // Note: thread-safe, supports multi-reader concurrency.
+    Status scan(std::unique_ptr<ScanSpec>&& spec, 
std::unique_ptr<MemTabletScan>* scan);
+
+    // Create a write transaction
+    //
+    // Note: Thread-safe, can have multiple writetxn at the same time.
+    Status create_write_txn(std::unique_ptr<WriteTxn>* wtxn);
+
+    // Apply a write transaction and commit as the specified version
+    //
+    // Note: commit is done sequentially, protected by internal write lock
+    Status commit_write_txn(WriteTxn* wtxn, uint64_t version);
+
 private:
+    // memory::Schema is used internally rather than TabletSchema, so we need 
an extra
+    // copy of _schema with type memory::Schema.
+    scoped_refptr<Schema> _mem_schema;
+
+    // TODO: support multiple sub-tablets in the future
+    std::unique_ptr<MemSubTablet> _sub_tablet;
+
+    std::mutex _write_lock;
+
     DISALLOW_COPY_AND_ASSIGN(MemTablet);
 };
 
diff --git a/be/src/olap/memory/partial_row_batch.cpp 
b/be/src/olap/memory/partial_row_batch.cpp
new file mode 100644
index 0000000..b7d2b5e
--- /dev/null
+++ b/be/src/olap/memory/partial_row_batch.cpp
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/partial_row_batch.h"
+
+#include "util/bitmap.h"
+
+namespace doris {
+namespace memory {
+
+// Methods for PartialRowBatch
+
+PartialRowBatch::PartialRowBatch(scoped_refptr<Schema>* schema)
+        : _schema(*schema), _bit_set_size(_schema->cid_size()) {
+    _cells.reserve(_schema->num_columns());
+}
+
+PartialRowBatch::~PartialRowBatch() {}
+
+Status PartialRowBatch::load(std::vector<uint8_t>&& buffer) {
+    _buffer = std::move(buffer);
+    _pos = _buffer.data();
+    _row_size = *reinterpret_cast<const uint64_t*>(_pos);
+    _pos += sizeof(uint64_t);
+    _next_row = 0;
+    return Status::OK();
+}
+
+Status PartialRowBatch::next_row(bool* has_row) {
+    DCHECK_LE(_next_row, _row_size);
+    *has_row = false;
+    if (_next_row == _row_size) {
+        return Status::OK();
+    }
+    DCHECK_LE(_pos, _buffer.data() + _buffer.size());
+    _cells.clear();
+    uint32_t row_bsize = *reinterpret_cast<const uint32_t*>(_pos);
+    _pos += sizeof(uint32_t);
+    const uint8_t* cur = _pos;
+    size_t bit_all_size = *reinterpret_cast<const uint16_t*>(cur);
+    cur += 2;
+    DCHECK_LE(bit_all_size, 65535);
+    const uint8_t* bitvec = cur;
+    cur += bit_all_size;
+    size_t cur_nullable_idx = _bit_set_size;
+    if (BitmapTest(bitvec, 0)) {
+        _delete = true;
+    }
+    size_t cid = 1;
+    while (BitmapFindFirstSet(bitvec, cid, _bit_set_size, &cid)) {
+        const ColumnSchema* cs = _schema->get_by_cid(cid);
+        DCHECK(cs);
+        if (cs->is_nullable()) {
+            if (BitmapTest(bitvec, cur_nullable_idx)) {
+                // is null
+                _cells.emplace_back(cid, nullptr);
+            } else {
+                // not null
+                _cells.emplace_back(cid, cur);
+            }
+            cur_nullable_idx++;
+        } else {
+            _cells.emplace_back(cid, cur);
+        }
+        const uint8_t* pdata = _cells.back().data;
+        if (pdata != nullptr) {
+            size_t bsize = _schema->get_column_byte_size(cid);
+            if (bsize == 0) {
+                return Status::NotSupported("varlen column type not 
supported");
+                // size_t sz = *(uint16_t*)cur;
+                // cur += (sz + 2);
+            } else {
+                cur += bsize;
+            }
+        }
+        cid++;
+    }
+    if (_pos + row_bsize != cur) {
+        return Status::InternalError("PartialRowBatch data corruption");
+    }
+    _pos = cur;
+    *has_row = true;
+    _next_row++;
+    return Status::OK();
+}
+
+Status PartialRowBatch::cur_row_get_cell(size_t idx, const ColumnSchema** cs,
+                                         const void** data) const {
+    if (idx >= _cells.size()) {
+        return Status::InvalidArgument("get_cell: idx exceed cells size");
+    }
+    auto& cell = _cells[idx];
+    *cs = _schema->get_by_cid(cell.cid);
+    *data = cell.data;
+    return Status::OK();
+}
+
+// Methods for PartialRowWriter
+
+PartialRowWriter::PartialRowWriter(scoped_refptr<Schema>* schema)
+        : _schema(*schema), _bit_set_size(_schema->cid_size()), 
_bit_nullable_size(0) {
+    _temp_cells.resize(_schema->cid_size());
+}
+
+Status PartialRowWriter::start_batch(size_t row_capacity, size_t 
byte_capacity) {
+    _row_size = 0;
+    _row_capacity = row_capacity;
+    // reserve space for _row_size
+    _buffer.resize(sizeof(uint64_t));
+    _buffer.reserve(byte_capacity);
+    return Status::OK();
+}
+
+PartialRowWriter::~PartialRowWriter() {}
+
+Status PartialRowWriter::start_row() {
+    if (_row_size >= _row_capacity) {
+        return Status::InvalidArgument("over capacity");
+    }
+    _bit_nullable_size = 0;
+    memset(&(_temp_cells[0]), 0, sizeof(CellInfo) * _temp_cells.size());
+    return Status::OK();
+}
+
+Status PartialRowWriter::end_row() {
+    if (_row_size >= _row_capacity) {
+        return Status::InvalidArgument("over capacity");
+    }
+    size_t row_byte_size = byte_size();
+    size_t new_size = _buffer.size() + row_byte_size + 4;
+    size_t old_size = _buffer.size();
+    if (new_size > _buffer.capacity()) {
+        return Status::InvalidArgument("over capacity");
+    }
+    _buffer.resize(new_size);
+    uint8_t* pos = _buffer.data() + old_size;
+    *reinterpret_cast<uint32_t*>(pos) = row_byte_size;
+    pos += sizeof(uint32_t);
+    Status st = write(&pos);
+    DCHECK_EQ(pos, _buffer.data() + new_size);
+    if (!st.ok()) {
+        _buffer.resize(old_size);
+        return st;
+    }
+    _row_size++;
+    return Status::OK();
+}
+
+Status PartialRowWriter::set(const ColumnSchema* cs, uint32_t cid, const void* 
data) {
+    if (cs->is_nullable() || (data != nullptr)) {
+        if (cs->is_nullable() && !_temp_cells[cid].isnullable) {
+            _bit_nullable_size++;
+        }
+        _temp_cells[cid].isnullable = cs->is_nullable();
+        _temp_cells[cid].isset = 1;
+        _temp_cells[cid].data = reinterpret_cast<const uint8_t*>(data);
+    } else {
+        return Status::InvalidArgument("not nullable column set to null");
+    }
+    return Status::OK();
+}
+
+Status PartialRowWriter::set(const string& col, const void* data) {
+    auto cs = _schema->get_by_name(col);
+    if (cs == nullptr) {
+        return Status::NotFound("col name not found");
+    }
+    return set(cs, cs->cid(), data);
+}
+
+Status PartialRowWriter::set(uint32_t cid, const void* data) {
+    auto cs = _schema->get_by_cid(cid);
+    if (cs == nullptr) {
+        return Status::NotFound("cid not found");
+    }
+    return set(cs, cs->cid(), data);
+}
+
+Status PartialRowWriter::set_delete() {
+    // TODO: support delete
+    // _temp_cells[0].isset = 1;
+    return Status::NotSupported("delete not supported");
+}
+
+size_t PartialRowWriter::byte_size() const {
+    // TODO: support delete
+    size_t bit_all_size = num_block(_bit_set_size + _bit_nullable_size, 8);
+    size_t data_size = 2 + bit_all_size;
+    for (size_t i = 1; i < _temp_cells.size(); i++) {
+        if (_temp_cells[i].data != nullptr) {
+            size_t bsize = _schema->get_column_byte_size(i);
+            if (bsize == 0) {
+                LOG(FATAL) << "varlen column type not supported";
+                //data_size += 2 + 
reinterpret_cast<Slice*>(_temp_cells[i].data)->size();
+            } else {
+                data_size += bsize;
+            }
+        }
+    }
+    return data_size;
+}
+
+Status PartialRowWriter::write(uint8_t** ppos) {
+    size_t bit_all_size = num_block(_bit_set_size + _bit_nullable_size, 8);
+    if (bit_all_size >= 65536) {
+        return Status::NotSupported("too many columns");
+    }
+    // using reference is more convenient
+    uint8_t*& pos = *ppos;
+    *reinterpret_cast<uint16_t*>(pos) = (uint16_t)bit_all_size;
+    pos += 2;
+    uint8_t* bitvec = pos;
+    pos += bit_all_size;
+    memset(bitvec, 0, bit_all_size);
+    if (_temp_cells[0].isset) {
+        // deleted
+        BitmapSet(bitvec, 0);
+    }
+    size_t cur_nullable_idx = _bit_set_size;
+    for (size_t i = 1; i < _temp_cells.size(); i++) {
+        if (_temp_cells[i].isset) {
+            BitmapSet(bitvec, i);
+            if (_temp_cells[i].isnullable) {
+                if (_temp_cells[i].data == nullptr) {
+                    BitmapSet(bitvec, cur_nullable_idx);
+                }
+                cur_nullable_idx++;
+            }
+            const uint8_t* pdata = _temp_cells[i].data;
+            if (pdata != nullptr) {
+                size_t bsize = _schema->get_column_byte_size(i);
+                if (bsize == 0) {
+                    return Status::NotSupported("varlen column type not 
supported");
+                    // Some incomplete code to write string(Slice), may be 
useful in future
+                    // size_t sz = ((Slice*)pdata)->size();
+                    // *(uint16_t*)pos = (uint16_t)sz;
+                    // pos += 2;
+                    // memcpy(pos, ((Slice*)pdata)->data(), sz);
+                    // pos += sz;
+                } else {
+                    memcpy(pos, _temp_cells[i].data, bsize);
+                    pos += bsize;
+                }
+            }
+        } else if (i <= _schema->num_key_columns()) {
+            return Status::InvalidArgument("build without key columns");
+        }
+    }
+    return Status::OK();
+}
+
+Status PartialRowWriter::finish_batch(vector<uint8_t>* buffer) {
+    *reinterpret_cast<uint64_t*>(_buffer.data()) = _row_size;
+    _buffer.swap(*buffer);
+    _row_size = 0;
+    return Status::OK();
+}
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/partial_row_batch.h 
b/be/src/olap/memory/partial_row_batch.h
new file mode 100644
index 0000000..893f865
--- /dev/null
+++ b/be/src/olap/memory/partial_row_batch.h
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/memory/common.h"
+#include "olap/memory/schema.h"
+
+namespace doris {
+namespace memory {
+
+// A chunk of memory that stores a batch of serialized partial rows
+// User can iterate through all the partial rows, get each partial row's cells.
+//
+// Serialization format for a batch:
+// 4 byte len | serialized partial row
+// 4 byte len | serialized partial row
+// ...
+// 4 byte len | serialized partial row
+//
+// Serialization format for a partial row
+// bit vector(se + null) byte size (2 byte) |
+// bit vector mark set cells |
+// bit vector mark nullable cells' null value |
+// 8bit padding
+// serialized not null cells
+//
+// Example usage:
+// PartialRowBatch rb(&schema);
+// rb.load(buffer);
+// while (true) {
+//     bool has;
+//     rb.next(&has);
+//     if (!has) break;
+//     for (size_t j=0; j < reader.cell_size(); j++) {
+//         const ColumnSchema* cs = nullptr;
+//         const void* data = nullptr;
+//         // get column cell type and data
+//         rb.get_cell(j, &cs, &data);
+//     }
+// }
+//
+// Note: currently only fixed length column types are supported. All length 
and scalar types store
+// in native byte order(little endian in x86-64).
+//
+// Note: The serialization format is simple, it only provides basic 
functionalities
+// so we can quickly complete the whole create/read/write pipeline. The format 
may change
+// as the project evolves.
+class PartialRowBatch {
+public:
+    explicit PartialRowBatch(scoped_refptr<Schema>* schema);
+    ~PartialRowBatch();
+
+    const Schema& schema() const { return *_schema.get(); }
+
+    // Load from a serialized buffer
+    Status load(std::vector<uint8_t>&& buffer);
+
+    // Return row count in this batch
+    size_t row_size() const { return _row_size; }
+
+    // Iterate to next row, mark has_row to false if there is no more rows
+    Status next_row(bool* has_row);
+
+    // Get row operation cell count
+    size_t cur_row_cell_size() const { return _cells.size(); }
+    // Get row operation cell by index idx, return ColumnSchema and data 
pointer
+    Status cur_row_get_cell(size_t idx, const ColumnSchema** cs, const void** 
data) const;
+
+private:
+    scoped_refptr<Schema> _schema;
+
+    bool _delete = false;
+    size_t _bit_set_size = 0;
+    struct CellInfo {
+        CellInfo(uint32_t cid, const void* data)
+                : cid(cid), data(reinterpret_cast<const uint8_t*>(data)) {}
+        uint32_t cid = 0;
+        const uint8_t* data = nullptr;
+    };
+    vector<CellInfo> _cells;
+
+    size_t _next_row = 0;
+    size_t _row_size = 0;
+    const uint8_t* _pos = nullptr;
+    std::vector<uint8_t> _buffer;
+};
+
+// Writer for PartialRowBatch
+//
+// Example usage:
+// scoped_refptr<Schema> sc;
+// Schema::create("id int,uv int,pv int,city tinyint null", &sc);
+// PartialRowWriter writer(*sc.get());
+// writer.start_batch();
+// for (auto& row : rows) {
+//     writer.start_row();
+//     writer.set("column_name", value);
+//     ...
+//     writer.set(column_id, value);
+//     writer.end_row();
+// }
+// vector<uint8_t> buffer;
+// writer.finish_batch(&buffer);
+class PartialRowWriter {
+public:
+    static const size_t DEFAULT_BYTE_CAPACITY = 1 << 20;
+    static const size_t DEFAULT_ROW_CAPACIT = 1 << 16;
+
+    explicit PartialRowWriter(scoped_refptr<Schema>* schema);
+    ~PartialRowWriter();
+
+    Status start_batch(size_t row_capacity = DEFAULT_ROW_CAPACIT,
+                       size_t byte_capacity = DEFAULT_BYTE_CAPACITY);
+
+    // Start writing a new row
+    Status start_row();
+
+    // Set cell value by column name
+    // param data's memory must remain valid before calling build
+    Status set(const string& col, const void* data);
+
+    // Set cell value by column id
+    // param data's memory must remain valid before calling build
+    Status set(uint32_t cid, const void* data);
+
+    // set this row is delete operation
+    Status set_delete();
+
+    // Finish writing a row
+    Status end_row();
+
+    // Finish writing the whole ParitialRowBatch, return a serialized buffer
+    Status finish_batch(vector<uint8_t>* buffer);
+
+private:
+    Status set(const ColumnSchema* cs, uint32_t cid, const void* data);
+    size_t byte_size() const;
+    Status write(uint8_t** ppos);
+
+    scoped_refptr<Schema> _schema;
+    struct CellInfo {
+        CellInfo() = default;
+        uint32_t isset = 0;
+        uint32_t isnullable = 0;
+        const uint8_t* data = nullptr;
+    };
+    vector<CellInfo> _temp_cells;
+    size_t _bit_set_size = 0;
+    size_t _bit_nullable_size = 0;
+    size_t _row_size = 0;
+    size_t _row_capacity = 0;
+
+    std::vector<uint8_t> _buffer;
+};
+
+} // namespace memory
+} // namespace doris
diff --git a/be/src/olap/memory/schema.cpp b/be/src/olap/memory/schema.cpp
index cf9b3d3..d0b89c4 100644
--- a/be/src/olap/memory/schema.cpp
+++ b/be/src/olap/memory/schema.cpp
@@ -17,9 +17,50 @@
 
 #include "olap/memory/schema.h"
 
+#include "gutil/strings/split.h"
+
 namespace doris {
 namespace memory {
 
+bool supported(ColumnType type) {
+    switch (type) {
+    case OLAP_FIELD_TYPE_TINYINT:
+    case OLAP_FIELD_TYPE_SMALLINT:
+    case OLAP_FIELD_TYPE_INT:
+    case OLAP_FIELD_TYPE_BIGINT:
+    case OLAP_FIELD_TYPE_LARGEINT:
+    case OLAP_FIELD_TYPE_FLOAT:
+    case OLAP_FIELD_TYPE_DOUBLE:
+    case OLAP_FIELD_TYPE_BOOL:
+        return true;
+    default:
+        return false;
+    }
+}
+
+size_t get_type_byte_size(ColumnType type) {
+    switch (type) {
+    case OLAP_FIELD_TYPE_TINYINT:
+        return 1;
+    case OLAP_FIELD_TYPE_SMALLINT:
+        return 2;
+    case OLAP_FIELD_TYPE_INT:
+        return 4;
+    case OLAP_FIELD_TYPE_BIGINT:
+        return 8;
+    case OLAP_FIELD_TYPE_LARGEINT:
+        return 16;
+    case OLAP_FIELD_TYPE_FLOAT:
+        return 4;
+    case OLAP_FIELD_TYPE_DOUBLE:
+        return 8;
+    case OLAP_FIELD_TYPE_BOOL:
+        return 1;
+    default:
+        return 0;
+    }
+}
+
 ColumnSchema::ColumnSchema(const TabletColumn& tcolumn) : _tcolumn(tcolumn) {}
 
 ColumnSchema::ColumnSchema(uint32_t cid, const string& name, ColumnType type, 
bool nullable,
@@ -44,15 +85,46 @@ std::string ColumnSchema::debug_string() const {
 
 //////////////////////////////////////////////////////////////////////////////
 
+Status Schema::create(const string& desc, scoped_refptr<Schema>* sc) {
+    TabletSchemaPB tspb;
+    std::vector<std::string> cs = strings::Split(desc, ",", 
strings::SkipWhitespace());
+    uint32_t cid = 1;
+    for (std::string& c : cs) {
+        ColumnPB* cpb = tspb.add_column();
+        std::vector<std::string> fs = strings::Split(c, " ", 
strings::SkipWhitespace());
+        if (fs.size() < 2) {
+            return Status::InvalidArgument("bad schema desc");
+        }
+        cpb->set_is_key(cid == 1);
+        cpb->set_unique_id(cid++);
+        cpb->set_name(fs[0]);
+        cpb->set_type(fs[1]);
+        if (fs.size() == 3 && fs[2] == "null") {
+            cpb->set_is_nullable(true);
+        }
+    }
+    tspb.set_keys_type(KeysType::UNIQUE_KEYS);
+    tspb.set_next_column_unique_id(cid);
+    tspb.set_num_short_key_columns(1);
+    tspb.set_is_in_memory(false);
+    TabletSchema ts;
+    ts.init_from_pb(tspb);
+    sc->reset(new Schema(ts));
+    return Status::OK();
+}
+
 Schema::Schema(const TabletSchema& tschema) : _tschema(tschema) {
     _cid_size = 1;
     _cid_to_col.resize(_cid_size, nullptr);
+    _column_byte_sizes.resize(_cid_size, 0);
     for (size_t i = 0; i < num_columns(); i++) {
         const ColumnSchema* cs = get(i);
         _cid_size = std::max(_cid_size, cs->cid() + 1);
         _cid_to_col.resize(_cid_size, nullptr);
         _cid_to_col[cs->cid()] = cs;
         _name_to_col[cs->name()] = cs;
+        _column_byte_sizes.resize(_cid_size, 0);
+        _column_byte_sizes[cs->cid()] = get_type_byte_size(cs->type());
     }
 }
 
diff --git a/be/src/olap/memory/schema.h b/be/src/olap/memory/schema.h
index ebe3571..9a41480 100644
--- a/be/src/olap/memory/schema.h
+++ b/be/src/olap/memory/schema.h
@@ -29,22 +29,36 @@ namespace memory {
 // Memory engine's column type, just use FieldType for now
 typedef FieldType ColumnType;
 
+// Return true if this ColumnType is supported
+bool supported(ColumnType type);
+
 // Memory engine's column schema, simple wrapper of TabletColumn.
 // TODO: Add more properties and methods later
 class ColumnSchema {
 public:
     explicit ColumnSchema(const TabletColumn& tcolumn);
     ColumnSchema(uint32_t cid, const string& name, ColumnType type, bool 
nullable, bool is_key);
+
+    // Get column id
     inline uint32_t cid() const { return 
static_cast<uint32_t>(_tcolumn.unique_id()); }
+
+    // Get column name
     inline std::string name() const { return _tcolumn.name(); }
+
+    // Get column type
     inline ColumnType type() const { return _tcolumn.type(); }
+
+    // Get is nullable
     inline bool is_nullable() const { return _tcolumn.is_nullable(); }
+
+    // Get is key
     inline bool is_key() const { return _tcolumn.is_key(); }
 
     std::string type_name() const;
     std::string debug_string() const;
 
 private:
+    // Note: do not add more field into this class, it needs to be identical 
to TabletColumn
     TabletColumn _tcolumn;
 };
 
@@ -55,25 +69,47 @@ private:
 // 2. in the future, there may be a special compound primary key column
 //    if primary-key has multiple columns
 // TODO: Add more properties and methods later
-class Schema {
+class Schema : public RefCountedThreadSafe<Schema> {
 public:
+    // Create schema by description string, utility method for test
+    static Status create(const string& desc, scoped_refptr<Schema>* sc);
+
     explicit Schema(const TabletSchema& tschema);
+
     std::string debug_string() const;
+
     inline size_t num_columns() const { return _tschema.num_columns(); }
+
     inline size_t num_key_columns() const { return _tschema.num_key_columns(); 
}
 
+    // Get ColumnSchema by index
     const ColumnSchema* get(size_t idx) const;
 
+    // Get ColumnSchema by name
     const ColumnSchema* get_by_name(const string& name) const;
 
+    // Get column id space size
+    //
+    // For example:
+    // If a schema have 5 columns with id [1, 2, 3, 5, 6]
+    // It's cid_size equals max(cid)+1 = 7
     uint32_t cid_size() const;
+
+    // Get ColumnSchema by column id
     const ColumnSchema* get_by_cid(uint32_t cid) const;
 
+    // Get column type byte size by column id
+    size_t get_column_byte_size(uint32_t cid) const {
+        DCHECK_LT(cid, _column_byte_sizes.size());
+        return _column_byte_sizes[cid];
+    }
+
 private:
     TabletSchema _tschema;
     uint32_t _cid_size;
     std::unordered_map<string, const ColumnSchema*> _name_to_col;
     vector<const ColumnSchema*> _cid_to_col;
+    vector<size_t> _column_byte_sizes;
 };
 
 } // namespace memory
diff --git a/be/src/olap/memory/mem_tablet.cpp 
b/be/src/olap/memory/write_txn.cpp
similarity index 70%
copy from be/src/olap/memory/mem_tablet.cpp
copy to be/src/olap/memory/write_txn.cpp
index 5c2edd8..908a0c5 100644
--- a/be/src/olap/memory/mem_tablet.cpp
+++ b/be/src/olap/memory/write_txn.cpp
@@ -15,15 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "olap/memory/mem_tablet.h"
+#include "olap/memory/write_txn.h"
 
 namespace doris {
 namespace memory {
 
-MemTablet::MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
-        : BaseTablet(tablet_meta, data_dir) {}
+WriteTxn::WriteTxn(scoped_refptr<Schema>* schema) : _schema(schema->get()) {}
 
-MemTablet::~MemTablet() {}
+WriteTxn::~WriteTxn() {}
+
+PartialRowBatch* WriteTxn::new_batch() {
+    _batches.emplace_back(new PartialRowBatch(&_schema));
+    return _batches.back().get();
+}
+
+PartialRowBatch* WriteTxn::get_batch(size_t idx) const {
+    return _batches[idx].get();
+}
 
 } // namespace memory
 } // namespace doris
diff --git a/be/src/olap/memory/mem_tablet.h b/be/src/olap/memory/write_txn.h
similarity index 50%
copy from be/src/olap/memory/mem_tablet.h
copy to be/src/olap/memory/write_txn.h
index 7efc945..74cee3c 100644
--- a/be/src/olap/memory/mem_tablet.h
+++ b/be/src/olap/memory/write_txn.h
@@ -17,24 +17,41 @@
 
 #pragma once
 
-#include "olap/base_tablet.h"
+#include "olap/memory/common.h"
+#include "olap/memory/partial_row_batch.h"
+#include "olap/memory/schema.h"
 
 namespace doris {
 namespace memory {
 
-// Tablet class for memory-optimized storage engine.
+class PartialRowBatch;
+
+// Class for write transaction
 //
-// It stores all its data in-memory, and is designed for tables with
-// frequent updates.
+// Note: Currently it stores all its operations in memory, to make things 
simple,
+// so we can quickly complete the whole create/read/write pipeline. The data 
structure may
+// change as the project evolves.
 //
-// TODO: This is just a skeleton, will add implementation in the future.
-class MemTablet : public BaseTablet {
+// TODO: add write to/load from WritexTx files in future.
+class WriteTxn {
 public:
-    MemTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
-    virtual ~MemTablet();
+    explicit WriteTxn(scoped_refptr<Schema>* schema);
+    ~WriteTxn();
+
+    const Schema& schema() const { return *_schema.get(); }
+
+    // Get number of batches
+    size_t batch_size() const { return _batches.size(); }
+
+    // Get batch by index
+    PartialRowBatch* get_batch(size_t idx) const;
+
+    // Add a new batch to this WriteTx
+    PartialRowBatch* new_batch();
 
 private:
-    DISALLOW_COPY_AND_ASSIGN(MemTablet);
+    scoped_refptr<Schema> _schema;
+    std::vector<std::unique_ptr<PartialRowBatch>> _batches;
 };
 
 } // namespace memory
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 96f1de4..2b49580 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -53,14 +53,6 @@ TabletSharedPtr 
Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
     return std::make_shared<Tablet>(tablet_meta, data_dir);
 }
 
-void Tablet::_gen_tablet_path() {
-    std::string path = _data_dir->path() + DATA_PREFIX;
-    path = path_util::join_path_segments(path, 
std::to_string(_tablet_meta->shard_id()));
-    path = path_util::join_path_segments(path, 
std::to_string(_tablet_meta->tablet_id()));
-    path = path_util::join_path_segments(path, 
std::to_string(_tablet_meta->schema_hash()));
-    _tablet_path = path;
-}
-
 Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) :
         BaseTablet(tablet_meta, data_dir),
         _is_bad(false),
@@ -69,7 +61,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* 
data_dir) :
         _last_cumu_compaction_success_millis(0),
         _last_base_compaction_success_millis(0),
         _cumulative_point(kInvalidCumulativePoint) {
-    _gen_tablet_path();
     _rs_graph.construct_rowset_graph(_tablet_meta->all_rs_metas());
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f8746f8..40cac13 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -78,7 +78,7 @@ public:
     inline KeysType keys_type() const;
     inline size_t num_columns() const;
     inline size_t num_null_columns() const;
-    inline size_t num_key_columns() const ;
+    inline size_t num_key_columns() const;
     inline size_t num_short_key_columns() const;
     inline size_t num_rows_per_row_block() const;
     inline CompressKind compress_kind() const;
@@ -233,7 +233,6 @@ private:
     OLAPStatus _contains_version(const Version& version);
     void _max_continuous_version_from_begining_unlocked(Version* version,
                                                         VersionHash* v_hash) 
const ;
-    void _gen_tablet_path();
     RowsetSharedPtr _rowset_with_largest_size();
     void _delete_inc_rowset_by_version(const Version& version, const 
VersionHash& version_hash);
     OLAPStatus _capture_consistent_rowsets_unlocked(const vector<Version>& 
version_path,
diff --git a/be/src/util/time.h b/be/src/util/time.h
index 01d56c6..5a4bfbb 100755
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -59,6 +59,10 @@ inline int64_t MonotonicSeconds() {
   return GetMonoTimeMicros() / MICROS_PER_SEC;
 }
 
+inline double GetMonoTimeSecondsAsDouble() {
+    return GetMonoTimeMicros() / static_cast<double>(MICROS_PER_SEC);
+}
+
 // Returns the time since the Epoch measured in microseconds.
 inline int64_t GetCurrentTimeMicros() {
   timespec ts;
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 48c34ce..f00d7b7 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -86,3 +86,4 @@ ADD_BE_TEST(memory/hash_index_test)
 ADD_BE_TEST(memory/column_delta_test)
 ADD_BE_TEST(memory/schema_test)
 ADD_BE_TEST(memory/column_test)
+ADD_BE_TEST(memory/partial_row_batch_test)
diff --git a/be/test/olap/memory/partial_row_batch_test.cpp 
b/be/test/olap/memory/partial_row_batch_test.cpp
new file mode 100644
index 0000000..354795c
--- /dev/null
+++ b/be/test/olap/memory/partial_row_batch_test.cpp
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/memory/partial_row_batch.h"
+
+#include <gtest/gtest.h>
+
+#include <vector>
+
+#include "util/hash_util.hpp"
+
+namespace doris {
+namespace memory {
+
+TEST(PartialRowbatch, write) {
+    scoped_refptr<Schema> sc;
+    ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", 
&sc).ok());
+    PartialRowWriter writer(&sc);
+    srand(1);
+    const int N = 1000;
+    size_t nrow = 0;
+    // add insert/update operation
+    EXPECT_TRUE(writer.start_batch().ok());
+    for (int i = 0; i < N; i++) {
+        nrow++;
+        writer.start_row();
+        int id = i;
+        int uv = rand() % 10000;
+        int pv = rand() % 10000;
+        int8_t city = rand() % 100;
+        EXPECT_TRUE(writer.set("id", &id).ok());
+        if (i % 3 == 0) {
+            EXPECT_TRUE(writer.set("uv", &uv).ok());
+            EXPECT_TRUE(writer.set("pv", &pv).ok());
+            EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr : 
&city).ok());
+        }
+        EXPECT_TRUE(writer.end_row().ok());
+    }
+    vector<uint8_t> buffer;
+    writer.finish_batch(&buffer);
+
+    PartialRowBatch rb(&sc);
+    EXPECT_TRUE(rb.load(std::move(buffer)).ok());
+    EXPECT_EQ(rb.row_size(), nrow);
+    // read from rowbatch and check equality
+    srand(1);
+    for (size_t i = 0; i < nrow; i++) {
+        bool has_row = false;
+        EXPECT_TRUE(rb.next_row(&has_row).ok());
+        EXPECT_TRUE(has_row);
+        if (i % 3 == 0) {
+            EXPECT_EQ(rb.cur_row_cell_size(), 4);
+        } else {
+            EXPECT_EQ(rb.cur_row_cell_size(), 1);
+        }
+        int id = i;
+        int uv = rand() % 10000;
+        int pv = rand() % 10000;
+        int8_t city = rand() % 100;
+
+        const ColumnSchema* cs = nullptr;
+        const void* data = nullptr;
+
+        EXPECT_TRUE(rb.cur_row_get_cell(0, &cs, &data).ok());
+        EXPECT_EQ(cs->cid(), 1);
+        EXPECT_EQ(*(int32_t*)data, id);
+
+        if (i % 3 == 0) {
+            EXPECT_TRUE(rb.cur_row_get_cell(1, &cs, &data).ok());
+            EXPECT_EQ(cs->cid(), 2);
+            EXPECT_EQ(*(int32_t*)data, uv);
+
+            EXPECT_TRUE(rb.cur_row_get_cell(2, &cs, &data).ok());
+            EXPECT_EQ(cs->cid(), 3);
+            EXPECT_EQ(*(int32_t*)data, pv);
+
+            EXPECT_TRUE(rb.cur_row_get_cell(3, &cs, &data).ok());
+            EXPECT_EQ(cs->cid(), 4);
+            if (city % 2 == 0) {
+                EXPECT_EQ(data, nullptr);
+            } else {
+                EXPECT_EQ(*(int8_t*)data, city);
+            }
+        }
+    }
+    bool has_row = false;
+    EXPECT_TRUE(rb.next_row(&has_row).ok());
+    EXPECT_FALSE(has_row);
+}
+
+} // namespace memory
+} // namespace doris
+
+int main(int argc, char** argv) {
+    testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/be/test/olap/memory/schema_test.cpp 
b/be/test/olap/memory/schema_test.cpp
index 8772288..71d7d52 100644
--- a/be/test/olap/memory/schema_test.cpp
+++ b/be/test/olap/memory/schema_test.cpp
@@ -32,6 +32,20 @@ TEST(ColumnSchema, create) {
     EXPECT_TRUE(cs.is_key());
 }
 
+TEST(Schema, desc_create) {
+    scoped_refptr<Schema> sc;
+    ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", 
&sc).ok());
+    ASSERT_EQ(sc->num_columns(), 4);
+    ASSERT_EQ(sc->num_key_columns(), 1);
+    ASSERT_EQ(sc->get(0)->cid(), 1);
+    ASSERT_EQ(sc->get(1)->cid(), 2);
+    ASSERT_EQ(sc->get(2)->cid(), 3);
+    ASSERT_EQ(sc->get(3)->cid(), 4);
+    ASSERT_EQ(sc->get_by_name("city")->is_nullable(), true);
+    ASSERT_EQ(sc->get_by_name("pv")->is_nullable(), false);
+    ASSERT_EQ(sc->get_by_name("uv")->type(), ColumnType::OLAP_FIELD_TYPE_INT);
+}
+
 TEST(Schema, create) {
     TabletSchemaPB tspb;
     auto cpb = tspb.add_column();
@@ -52,10 +66,10 @@ TEST(Schema, create) {
     tspb.set_is_in_memory(false);
     TabletSchema ts;
     ts.init_from_pb(tspb);
-    Schema schema(ts);
-    EXPECT_EQ(schema.cid_size(), 3);
-    EXPECT_EQ(schema.get_by_name("uid")->name(), std::string("uid"));
-    EXPECT_EQ(schema.get_by_cid(1)->name(), std::string("uid"));
+    scoped_refptr<Schema> schema(new Schema(ts));
+    EXPECT_EQ(schema->cid_size(), 3);
+    EXPECT_EQ(schema->get_by_name("uid")->name(), std::string("uid"));
+    EXPECT_EQ(schema->get_by_cid(1)->name(), std::string("uid"));
 }
 
 } // namespace memory


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to