xiaokang commented on code in PR #15491:
URL: https://github.com/apache/doris/pull/15491#discussion_r1062070504


##########
be/src/util/mysql_global.h:
##########
@@ -26,13 +26,16 @@ typedef unsigned char uchar;
 
 #define int1store(T, A) *((uint8_t*)(T)) = (uint8_t)(A)
 #define int2store(T, A) *((uint16_t*)(T)) = (uint16_t)(A)
+#define int4store(T, A) *((uint32_t*)(T)) = (uint32_t)(A)
 #define int3store(T, A)                           \
     do {                                          \
         *(T) = (uchar)((A));                      \
         *(T + 1) = (uchar)(((uint32_t)(A) >> 8)); \
         *(T + 2) = (uchar)(((A) >> 16));          \
     } while (0)
 #define int8store(T, A) *((int64_t*)(T)) = (uint64_t)(A)
+#define float4store(T, A) memcpy(T, (&A), sizeof(float))

Review Comment:
   Why is float not the same as int?



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const 
doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, 
MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (type == TYPE_HLL) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (is_string_type(type)) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->insert_data(blob->getBlob(), blob->getBlobLen());
+    } else {
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_TINYINT: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_SMALLINT: {
+            assert(slot_value->isInt16());
+            dst->insert(static_cast<JsonbInt16Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_INT: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_BIGINT: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_LARGEINT: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_FLOAT:
+        case TYPE_DOUBLE: {
+            assert(slot_value->isDouble());
+            dst->insert(static_cast<JsonbDoubleVal*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATE: {
+            assert(slot_value->isInt32());
+            int32_t val = static_cast<JsonbInt32Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 
1);
+            break;
+        }
+        case TYPE_DATETIME: {
+            assert(slot_value->isInt64());
+            int64_t val = static_cast<JsonbInt64Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 
1);
+            break;
+        }
+        case TYPE_DATEV2: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATETIMEV2: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL32: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL64: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL128I: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        default:
+            LOG(FATAL) << "unknow type " << type;
+            break;
+        }
+    }
+}
+
+// column value -> jsonb
+static void serialize_column(Arena* mem_pool, const TabletColumn& 
tablet_column,
+                             const IColumn* column, const StringRef& data_ref, 
int row,
+                             JsonbWriterT<JsonbOutStream>& jsonb_writer) {
+    jsonb_writer.writeKey(tablet_column.unique_id());
+    if (is_column_null_at(row, column, tablet_column.type(), data_ref)) {
+        jsonb_writer.writeNull();
+        return;
+    }
+    if (tablet_column.is_array_type()) {
+        const char* begin = nullptr;
+        StringRef value = column->serialize_value_into_arena(row, *mem_pool, 
begin);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(value.data, value.size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_OBJECT) {
+        auto bitmap_value = (BitmapValue*)(data_ref.data);
+        auto size = bitmap_value->getSizeInBytes();
+        // serialize the content of string
+        auto ptr = mem_pool->alloc(size);
+        bitmap_value->write(reinterpret_cast<char*>(ptr));
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_HLL) {
+        auto hll_value = (HyperLogLog*)(data_ref.data);
+        auto size = hll_value->max_serialized_size();
+        auto ptr = reinterpret_cast<char*>(mem_pool->alloc(size));
+        size_t actual_size = hll_value->serialize((uint8_t*)ptr);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), 
actual_size);
+        jsonb_writer.writeEndBinary();
+    } else if (is_jsonb_blob_type(tablet_column.type())) {
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(data_ref.data), 
data_ref.size);
+        jsonb_writer.writeEndBinary();
+    } else {
+        switch (tablet_column.type()) {
+        case OLAP_FIELD_TYPE_BOOL: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_TINYINT: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_SMALLINT: {
+            int16_t val = *reinterpret_cast<const int16_t*>(data_ref.data);
+            jsonb_writer.writeInt16(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_INT: {
+            int32_t val = *reinterpret_cast<const int32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_BIGINT: {
+            int64_t val = *reinterpret_cast<const int64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_LARGEINT: {
+            __int128_t val = *reinterpret_cast<const 
__int128_t*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_FLOAT: {
+            float val = *reinterpret_cast<const float*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DOUBLE: {
+            double val = *reinterpret_cast<const double*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATE: {
+            const auto* datetime_cur = reinterpret_cast<const 
VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt32(datetime_cur->to_olap_date());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIME: {
+            const auto* datetime_cur = reinterpret_cast<const 
VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt64(datetime_cur->to_olap_datetime());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATEV2: {
+            uint32_t val = *reinterpret_cast<const uint32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIMEV2: {
+            uint64_t val = *reinterpret_cast<const uint64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL32: {
+            Decimal32::NativeType val =
+                    *reinterpret_cast<const 
Decimal32::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL64: {
+            Decimal64::NativeType val =
+                    *reinterpret_cast<const 
Decimal64::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL128I: {
+            Decimal128I::NativeType val =
+                    *reinterpret_cast<const 
Decimal128I::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL:
+            LOG(FATAL) << "OLAP_FIELD_TYPE_DECIMAL not implemented use 
DecimalV3 instead";
+            break;
+        default:
+            LOG(FATAL) << "unknow type " << tablet_column.type();
+            break;
+        }
+    }
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                        ColumnString& dst, int num_cols) {
+    auto num_rows = block.rows();
+    Arena pool;
+    assert(num_cols <= block.columns());
+    for (int i = 0; i < num_rows; ++i) {
+        JsonbWriterT<JsonbOutStream> jsonb_writer;
+        jsonb_writer.writeStartObject();
+        for (int j = 0; j < num_cols; ++j) {
+            const auto& column = block.get_by_position(j).column;
+            const auto& tablet_column = schema.columns()[j];
+            const auto& data_ref =
+                    !tablet_column.is_array_type() ? column->get_data_at(i) : 
StringRef();

Review Comment:
   Why use empty StringRef for array?



##########
be/src/util/mysql_global.h:
##########
@@ -26,13 +26,16 @@ typedef unsigned char uchar;
 
 #define int1store(T, A) *((uint8_t*)(T)) = (uint8_t)(A)
 #define int2store(T, A) *((uint16_t*)(T)) = (uint16_t)(A)
+#define int4store(T, A) *((uint32_t*)(T)) = (uint32_t)(A)

Review Comment:
   It's better to keep 1 2 3 4 order



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const 
doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, 
MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO

Review Comment:
   add impl



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const 
doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, 
MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (type == TYPE_HLL) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (is_string_type(type)) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->insert_data(blob->getBlob(), blob->getBlobLen());
+    } else {
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_TINYINT: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_SMALLINT: {
+            assert(slot_value->isInt16());
+            dst->insert(static_cast<JsonbInt16Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_INT: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_BIGINT: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_LARGEINT: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_FLOAT:

Review Comment:
   we can add float in JSONB to avoid precision problem



##########
be/src/service/tablet_lookup_metric.h:
##########
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "butil/containers/doubly_buffered_data.h"
+#include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gutil/int128.h"
+#include "olap/tablet.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+// For caching point lookup pre allocted blocks and exprs
+class Reusable {
+public:
+    ~Reusable();
+
+    bool is_expired(int64_t ttl_ms) const {
+        return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
+    }
+
+    Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& 
output_exprs,
+                size_t block_size = 1);
+
+    RuntimeState* runtime_state() { return _runtime_state.get(); }
+
+    std::unique_ptr<vectorized::Block> get_block();
+
+    // do not touch block after returned
+    void return_block(std::unique_ptr<vectorized::Block>& block);
+
+    TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); 
}
+
+    const std::vector<vectorized::VExprContext*>& output_exprs() { return 
_output_exprs_ctxs; }
+
+private:
+    // caching TupleDescriptor, output_expr, etc...
+    std::unique_ptr<RuntimeState> _runtime_state;
+    DescriptorTbl* _desc_tbl;
+    std::mutex _block_mutex;
+    // prevent from allocte too many tmp blocks
+    std::vector<std::unique_ptr<vectorized::Block>> _block_pool;
+    std::vector<vectorized::VExprContext*> _output_exprs_ctxs;
+    int64_t _create_timestamp = 0;
+};
+
+// A cache used for prepare stmt.
+// One connection per stmt perf uuid
+// Use DoublyBufferedData to wrap Cache for performance and thread safe,
+// since it's not barely modified
+class LookupCache {
+public:
+    // uuid to reusable
+    using Cache = phmap::flat_hash_map<uint128, std::shared_ptr<Reusable>>;
+    using CacheIter = Cache::iterator;
+
+    LookupCache() = default;
+    static LookupCache& instance() {
+        static LookupCache ins;
+        return ins;
+    }
+
+    void add(uint128 cache_id, std::shared_ptr<Reusable> item) {
+        assert(item != nullptr);
+        _double_buffer_cache.Modify(update_cache, std::make_pair(cache_id, 
item));
+    }
+
+    // find an item, return null if not exist
+    std::shared_ptr<Reusable> get(uint128 cache_id) {
+        butil::DoublyBufferedData<Cache>::ScopedPtr s;
+        if (_double_buffer_cache.Read(&s) != 0) {
+            LOG(WARNING) << "failed to get cache from double buffer data";
+            return nullptr;
+        }
+        auto it = s->find(cache_id);
+        if (it != s->end()) {
+            return it->second;
+        }
+        return nullptr;
+    }
+
+private:
+    butil::DoublyBufferedData<Cache> _double_buffer_cache;
+    // 30 seconds for expiring an item
+    int32_t _expir_seconds = config::tablet_lookup_cache_clean_interval;
+
+    static size_t update_cache(Cache& old_cache,
+                               const std::pair<uint128, 
std::shared_ptr<Reusable>>& p) {
+        old_cache.emplace(p);
+        return 1;
+    }
+
+    static size_t remove_items(Cache& old_cache, const std::vector<uint128>& 
keys) {
+        for (size_t i = 0; i < keys.size(); ++i) {
+            old_cache.erase(keys[i]);
+        }
+        return 1;
+    }
+
+    // Called from StorageEngine::_start_clean_lookup_cache
+    friend class StorageEngine;
+    void prune() {
+        std::vector<uint128> expired_keys;
+        {
+            butil::DoublyBufferedData<Cache>::ScopedPtr s;
+            if (_double_buffer_cache.Read(&s) != 0) {
+                return;
+            }
+            for (auto it = s->begin(); it != s->end(); ++it) {
+                if (it->second->is_expired(_expir_seconds * 1000)) {
+                    expired_keys.push_back(it->first);
+                }
+            }
+        }
+
+        _double_buffer_cache.Modify(remove_items, expired_keys);
+        LOG(INFO) << "prune lookup cache, total " << expired_keys.size() << " 
expired items";
+    }
+};
+
+struct Metrics {
+    Metrics()
+            : init_ns(TUnit::TIME_NS),
+              init_key_ns(TUnit::TIME_NS),
+              lookup_key_ns(TUnit::TIME_NS),
+              lookup_data_ns(TUnit::TIME_NS),
+              output_data_ns(TUnit::TIME_NS) {}
+    RuntimeProfile::Counter init_ns;
+    RuntimeProfile::Counter init_key_ns;
+    RuntimeProfile::Counter lookup_key_ns;
+    RuntimeProfile::Counter lookup_data_ns;
+    RuntimeProfile::Counter output_data_ns;
+};
+
+// An util to do tablet lookup
+class TabletLookupMetric {

Review Comment:
   Is there any speical meaning for Metric in the class name?



##########
be/src/olap/tablet_schema.cpp:
##########
@@ -920,11 +923,25 @@ bool operator==(const TabletSchema& a, const 
TabletSchema& b) {
     if (a._is_in_memory != b._is_in_memory) return false;
     if (a._delete_sign_idx != b._delete_sign_idx) return false;
     if (a._disable_auto_compaction != b._disable_auto_compaction) return false;
+    if (a._store_row_column != b._store_row_column) return false;
     return true;
 }
 
 bool operator!=(const TabletSchema& a, const TabletSchema& b) {
     return !(a == b);
 }
 
+const TabletColumn& TabletSchema::row_oriented_column() {
+    static TabletColumn source_column(OLAP_FIELD_AGGREGATION_NONE,

Review Comment:
   maybe can use a const field



##########
be/src/service/tablet_lookup_metric.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/tablet_lookup_metric.h"
+
+#include "olap/row_cursor.h"
+#include "olap/storage_engine.h"
+#include "service/internal_service.h"
+#include "util/defer_op.h"
+#include "util/key_util.h"
+#include "util/runtime_profile.h"
+#include "util/thrift_util.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/sink/vmysql_result_writer.cpp"
+
+namespace doris {
+
+Reusable::~Reusable() {
+    for (vectorized::VExprContext* ctx : _output_exprs_ctxs) {
+        ctx->close(_runtime_state.get());
+    }
+}
+
+Status Reusable::init(const TDescriptorTable& t_desc_tbl, const 
std::vector<TExpr>& output_exprs,
+                      size_t block_size) {
+    _runtime_state.reset(new RuntimeState());
+    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), 
t_desc_tbl, &_desc_tbl));
+    _runtime_state->set_desc_tbl(_desc_tbl);
+    _block_pool.resize(block_size);
+    for (int i = 0; i < _block_pool.size(); ++i) {
+        _block_pool[i] = 
std::make_unique<vectorized::Block>(tuple_desc()->slots(), 10);
+    }
+
+    
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_runtime_state->obj_pool(),
 output_exprs,
+                                                         &_output_exprs_ctxs));
+    RowDescriptor row_desc(tuple_desc(), false);
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, 
_runtime_state.get(), row_desc));
+    _create_timestamp = butil::gettimeofday_ms();
+    return Status::OK();
+}
+
+std::unique_ptr<vectorized::Block> Reusable::get_block() {
+    std::lock_guard lock(_block_mutex);
+    if (_block_pool.empty()) {
+        return std::make_unique<vectorized::Block>(tuple_desc()->slots(), 4);
+    }
+    auto block = std::move(_block_pool.back());
+    _block_pool.pop_back();
+    return block;
+}
+
+void Reusable::return_block(std::unique_ptr<vectorized::Block>& block) {
+    std::lock_guard lock(_block_mutex);
+    if (_block_pool.size() > 128) {
+        return;
+    }
+    block->clear_column_data();
+    _block_pool.push_back(std::move(block));
+}
+
+Status TabletLookupMetric::init(const PTabletKeyLookupRequest* request,
+                                PTabletKeyLookupResponse* response) {
+    SCOPED_TIMER(&_profile_metrics.init_ns);
+    _response = response;
+    // using cache
+    uint128 uuid {static_cast<uint64_t>(request->uuid().uuid_high()),
+                  static_cast<uint64_t>(request->uuid().uuid_low())};
+    auto cache_handle = LookupCache::instance().get(uuid);
+    _binary_row_format = request->is_binary_row();
+    if (cache_handle != nullptr) {
+        _reusable = cache_handle;
+        _hit_lookup_cache = true;
+    } else {
+        // init handle
+        auto reusable_ptr = std::make_shared<Reusable>();
+        TDescriptorTable t_desc_tbl;
+        TExprList t_output_exprs;
+        uint32_t len = request->desc_tbl().size();
+        RETURN_IF_ERROR(
+                deserialize_thrift_msg(reinterpret_cast<const 
uint8_t*>(request->desc_tbl().data()),
+                                       &len, false, &t_desc_tbl));
+        len = request->output_expr().size();
+        RETURN_IF_ERROR(deserialize_thrift_msg(
+                reinterpret_cast<const 
uint8_t*>(request->output_expr().data()), &len, false,
+                &t_output_exprs));
+        _reusable = reusable_ptr;
+        if (uuid != 0) {
+            LookupCache::instance().add(uuid, reusable_ptr);
+            // could be reused by requests after, pre allocte more blocks
+            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, 
t_output_exprs.exprs, 128));
+        } else {
+            RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, 
t_output_exprs.exprs, 1));
+        }
+    }
+    _tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id());
+    if (_tablet == nullptr) {
+        LOG(WARNING) << "failed to do tablet_fetch_data. tablet [" << 
request->tablet_id()
+                     << "] is not exist";
+        return Status::NotFound(fmt::format("tablet {} not exist", 
request->tablet_id()));
+    }
+    RETURN_IF_ERROR(_init_keys(request));
+    _result_block = _reusable->get_block();
+    DCHECK(_result_block != nullptr);
+    return Status::OK();
+}
+
+Status TabletLookupMetric::lookup_up() {
+    RETURN_IF_ERROR(_lookup_row_key());
+    RETURN_IF_ERROR(_lookup_row_data());
+    RETURN_IF_ERROR(_output_data());
+    return Status::OK();
+}
+
+std::string TabletLookupMetric::print_profile() {
+    auto init_us = _profile_metrics.init_ns.value() / 1000;
+    auto init_key_us = _profile_metrics.init_key_ns.value() / 1000;
+    auto lookup_key_us = _profile_metrics.lookup_key_ns.value() / 1000;
+    auto lookup_data_us = _profile_metrics.lookup_data_ns.value() / 1000;
+    auto output_data_us = _profile_metrics.output_data_ns.value() / 1000;
+    auto total_us = init_us + lookup_key_us + lookup_data_us + output_data_us;
+    return fmt::format(
+            ""
+            "[lookup profile:{}us] init:{}us, init_key:{}us,"
+            ""
+            ""
+            "lookup_key:{}us, lookup_data:{}us, output_data:{}us, 
hit_lookup_cache:{}"
+            ""
+            ""
+            ", is_binary_row:{}, output_columns:{}"
+            "",
+            total_us, init_us, init_key_us, lookup_key_us, lookup_data_us, 
output_data_us,
+            _hit_lookup_cache, _binary_row_format, 
_reusable->output_exprs().size());
+}
+
+Status TabletLookupMetric::_init_keys(const PTabletKeyLookupRequest* request) {
+    SCOPED_TIMER(&_profile_metrics.init_key_ns);
+    // 1. get primary key from conditions
+    std::vector<OlapTuple> olap_tuples;
+    olap_tuples.resize(request->key_tuples().size());
+    for (size_t i = 0; i < request->key_tuples().size(); ++i) {
+        const KeyTuple& key_tuple = request->key_tuples(i);
+        for (const std::string& key_col : key_tuple.key_column_rep()) {
+            olap_tuples[i].add_value(key_col);
+        }
+    }
+    _primary_keys.resize(olap_tuples.size());
+    // get row cursor and encode keys
+    for (size_t i = 0; i < olap_tuples.size(); ++i) {
+        RowCursor cursor;
+        RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(), 
olap_tuples[i].values()));
+        RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i]));
+        encode_key_with_padding<RowCursor, true, true>(
+                &_primary_keys[i], cursor, 
_tablet->tablet_schema()->num_key_columns(), true);
+    }
+    return Status::OK();
+}
+
+Status TabletLookupMetric::_lookup_row_key() {
+    SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
+    _row_locations.reserve(_primary_keys.size());
+    // 2. lookup row location
+    Status st;
+    for (size_t i = 0; i < _primary_keys.size(); ++i) {
+        RowLocation location;
+        st = (_tablet->lookup_row_key(_primary_keys[i], nullptr, &location,
+                                      INT32_MAX /*rethink?*/));
+        if (st.is_not_found()) {
+            continue;

Review Comment:
   maybe add a mark in _row_locations



##########
be/src/vec/CMakeLists.txt:
##########
@@ -55,6 +55,7 @@ set(VEC_FILES
   columns/column_string.cpp
   columns/column_vector.cpp
   columns/columns_common.cpp
+  jsonb/serialize.cpp

Review Comment:
   move to 'j' position



##########
be/src/util/mysql_row_buffer.cpp:
##########
@@ -290,7 +392,11 @@ int MysqlRowBuffer::push_double(double data) {
     return 0;
 }
 
-int MysqlRowBuffer::push_time(double data) {
+template <bool is_binary_format>
+int MysqlRowBuffer<is_binary_format>::push_time(double data) {
+    if constexpr (is_binary_format) {
+        LOG(FATAL) << "not implemented";

Review Comment:
   add impl



##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -244,6 +244,7 @@ Status SegmentIterator::_get_row_ranges_by_keys() {
         auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
         RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
     }
+

Review Comment:
   just blank line in this file diff



##########
be/src/olap/delta_writer.cpp:
##########
@@ -440,6 +440,7 @@ void DeltaWriter::_build_current_tablet_schema(int64_t 
index_id,
                                                     
ptable_schema_param.indexes(i),
                                                     ori_tablet_schema);
     }
+

Review Comment:
   just blank line



##########
be/src/service/internal_service.cpp:
##########
@@ -391,6 +393,25 @@ void 
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
     st.to_protobuf(result->mutable_status());
 }
 
+Status PInternalServiceImpl::_tablet_fetch_data(const PTabletKeyLookupRequest* 
request,
+                                                PTabletKeyLookupResponse* 
response) {
+    TabletLookupMetric lookup_util;
+    RETURN_IF_ERROR(lookup_util.init(request, response));
+    RETURN_IF_ERROR(lookup_util.lookup_up());
+    LOG(INFO) << lookup_util.print_profile();

Review Comment:
   is INFO suitable?



##########
be/src/service/tablet_lookup_metric.h:
##########
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "butil/containers/doubly_buffered_data.h"
+#include "common/status.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "gutil/int128.h"
+#include "olap/tablet.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+// For caching point lookup pre allocted blocks and exprs
+class Reusable {
+public:
+    ~Reusable();
+
+    bool is_expired(int64_t ttl_ms) const {
+        return butil::gettimeofday_ms() - _create_timestamp > ttl_ms;
+    }
+
+    Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& 
output_exprs,
+                size_t block_size = 1);
+
+    RuntimeState* runtime_state() { return _runtime_state.get(); }
+
+    std::unique_ptr<vectorized::Block> get_block();
+
+    // do not touch block after returned
+    void return_block(std::unique_ptr<vectorized::Block>& block);
+
+    TupleDescriptor* tuple_desc() { return _desc_tbl->get_tuple_descriptor(0); 
}
+
+    const std::vector<vectorized::VExprContext*>& output_exprs() { return 
_output_exprs_ctxs; }
+
+private:
+    // caching TupleDescriptor, output_expr, etc...
+    std::unique_ptr<RuntimeState> _runtime_state;

Review Comment:
   Is it safe to destroy runtime_state at the end of Reusable's life cycle?



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/jsonb/serialize.h"
+
+#include "olap/hll.h"
+#include "olap/tablet_schema.h"
+#include "runtime/jsonb_value.h"
+#include "util/jsonb_stream.h"
+#include "util/jsonb_writer.h"
+#include "vec/common/arena.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+static inline bool is_column_null_at(int row, const IColumn* column, const 
doris::FieldType& type,
+                                     const StringRef& data_ref) {
+    if (type != OLAP_FIELD_TYPE_ARRAY) {
+        return data_ref.data == nullptr;
+    } else {
+        Field array;
+        column->get(row, array);
+        return array.is_null();
+    }
+}
+
+static bool is_jsonb_blob_type(FieldType type) {
+    return type == OLAP_FIELD_TYPE_VARCHAR || type == OLAP_FIELD_TYPE_CHAR ||
+           type == OLAP_FIELD_TYPE_STRING || type == OLAP_FIELD_TYPE_STRUCT ||
+           type == OLAP_FIELD_TYPE_ARRAY || type == OLAP_FIELD_TYPE_MAP ||
+           type == OLAP_FIELD_TYPE_HLL || type == OLAP_FIELD_TYPE_OBJECT ||
+           type == OLAP_FIELD_TYPE_JSONB;
+}
+
+// jsonb -> column value
+static void deserialize_column(PrimitiveType type, JsonbValue* slot_value, 
MutableColumnPtr& dst) {
+    if (type == TYPE_ARRAY) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->deserialize_and_insert_from_arena(blob->getBlob());
+    } else if (type == TYPE_OBJECT) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (type == TYPE_HLL) {
+        assert(slot_value->isBinary());
+        // TODO
+    } else if (is_string_type(type)) {
+        assert(slot_value->isBinary());
+        auto blob = static_cast<JsonbBlobVal*>(slot_value);
+        dst->insert_data(blob->getBlob(), blob->getBlobLen());
+    } else {
+        switch (type) {
+        case TYPE_BOOLEAN: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_TINYINT: {
+            assert(slot_value->isInt8());
+            dst->insert(static_cast<JsonbInt8Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_SMALLINT: {
+            assert(slot_value->isInt16());
+            dst->insert(static_cast<JsonbInt16Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_INT: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_BIGINT: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_LARGEINT: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_FLOAT:
+        case TYPE_DOUBLE: {
+            assert(slot_value->isDouble());
+            dst->insert(static_cast<JsonbDoubleVal*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATE: {
+            assert(slot_value->isInt32());
+            int32_t val = static_cast<JsonbInt32Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 
1);
+            break;
+        }
+        case TYPE_DATETIME: {
+            assert(slot_value->isInt64());
+            int64_t val = static_cast<JsonbInt64Val*>(slot_value)->val();
+            dst->insert_many_fix_len_data(reinterpret_cast<const char*>(&val), 
1);
+            break;
+        }
+        case TYPE_DATEV2: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DATETIMEV2: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL32: {
+            assert(slot_value->isInt32());
+            dst->insert(static_cast<JsonbInt32Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL64: {
+            assert(slot_value->isInt64());
+            dst->insert(static_cast<JsonbInt64Val*>(slot_value)->val());
+            break;
+        }
+        case TYPE_DECIMAL128I: {
+            assert(slot_value->isInt128());
+            dst->insert(static_cast<JsonbInt128Val*>(slot_value)->val());
+            break;
+        }
+        default:
+            LOG(FATAL) << "unknow type " << type;
+            break;
+        }
+    }
+}
+
+// column value -> jsonb
+static void serialize_column(Arena* mem_pool, const TabletColumn& 
tablet_column,
+                             const IColumn* column, const StringRef& data_ref, 
int row,
+                             JsonbWriterT<JsonbOutStream>& jsonb_writer) {
+    jsonb_writer.writeKey(tablet_column.unique_id());
+    if (is_column_null_at(row, column, tablet_column.type(), data_ref)) {
+        jsonb_writer.writeNull();
+        return;
+    }
+    if (tablet_column.is_array_type()) {
+        const char* begin = nullptr;
+        StringRef value = column->serialize_value_into_arena(row, *mem_pool, 
begin);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(value.data, value.size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_OBJECT) {
+        auto bitmap_value = (BitmapValue*)(data_ref.data);
+        auto size = bitmap_value->getSizeInBytes();
+        // serialize the content of string
+        auto ptr = mem_pool->alloc(size);
+        bitmap_value->write(reinterpret_cast<char*>(ptr));
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), size);
+        jsonb_writer.writeEndBinary();
+    } else if (tablet_column.type() == OLAP_FIELD_TYPE_HLL) {
+        auto hll_value = (HyperLogLog*)(data_ref.data);
+        auto size = hll_value->max_serialized_size();
+        auto ptr = reinterpret_cast<char*>(mem_pool->alloc(size));
+        size_t actual_size = hll_value->serialize((uint8_t*)ptr);
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(ptr), 
actual_size);
+        jsonb_writer.writeEndBinary();
+    } else if (is_jsonb_blob_type(tablet_column.type())) {
+        jsonb_writer.writeStartBinary();
+        jsonb_writer.writeBinary(reinterpret_cast<const char*>(data_ref.data), 
data_ref.size);
+        jsonb_writer.writeEndBinary();
+    } else {
+        switch (tablet_column.type()) {
+        case OLAP_FIELD_TYPE_BOOL: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_TINYINT: {
+            int8_t val = *reinterpret_cast<const int8_t*>(data_ref.data);
+            jsonb_writer.writeInt8(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_SMALLINT: {
+            int16_t val = *reinterpret_cast<const int16_t*>(data_ref.data);
+            jsonb_writer.writeInt16(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_INT: {
+            int32_t val = *reinterpret_cast<const int32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_BIGINT: {
+            int64_t val = *reinterpret_cast<const int64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_LARGEINT: {
+            __int128_t val = *reinterpret_cast<const 
__int128_t*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_FLOAT: {
+            float val = *reinterpret_cast<const float*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DOUBLE: {
+            double val = *reinterpret_cast<const double*>(data_ref.data);
+            jsonb_writer.writeDouble(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATE: {
+            const auto* datetime_cur = reinterpret_cast<const 
VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt32(datetime_cur->to_olap_date());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIME: {
+            const auto* datetime_cur = reinterpret_cast<const 
VecDateTimeValue*>(data_ref.data);
+            jsonb_writer.writeInt64(datetime_cur->to_olap_datetime());
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATEV2: {
+            uint32_t val = *reinterpret_cast<const uint32_t*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DATETIMEV2: {
+            uint64_t val = *reinterpret_cast<const uint64_t*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL32: {
+            Decimal32::NativeType val =
+                    *reinterpret_cast<const 
Decimal32::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt32(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL64: {
+            Decimal64::NativeType val =
+                    *reinterpret_cast<const 
Decimal64::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt64(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL128I: {
+            Decimal128I::NativeType val =
+                    *reinterpret_cast<const 
Decimal128I::NativeType*>(data_ref.data);
+            jsonb_writer.writeInt128(val);
+            break;
+        }
+        case OLAP_FIELD_TYPE_DECIMAL:
+            LOG(FATAL) << "OLAP_FIELD_TYPE_DECIMAL not implemented use 
DecimalV3 instead";
+            break;
+        default:
+            LOG(FATAL) << "unknow type " << tablet_column.type();
+            break;
+        }
+    }
+}
+
+void JsonbSerializeUtil::block_to_jsonb(const TabletSchema& schema, const 
Block& block,
+                                        ColumnString& dst, int num_cols) {
+    auto num_rows = block.rows();
+    Arena pool;
+    assert(num_cols <= block.columns());
+    for (int i = 0; i < num_rows; ++i) {
+        JsonbWriterT<JsonbOutStream> jsonb_writer;

Review Comment:
   writer can be reused and reset for each row



##########
be/src/service/tablet_lookup_metric.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/tablet_lookup_metric.h"
+
+#include "olap/row_cursor.h"
+#include "olap/storage_engine.h"
+#include "service/internal_service.h"
+#include "util/defer_op.h"
+#include "util/key_util.h"
+#include "util/runtime_profile.h"
+#include "util/thrift_util.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/sink/vmysql_result_writer.cpp"
+
+namespace doris {
+
+Reusable::~Reusable() {
+    for (vectorized::VExprContext* ctx : _output_exprs_ctxs) {
+        ctx->close(_runtime_state.get());
+    }
+}
+
+Status Reusable::init(const TDescriptorTable& t_desc_tbl, const 
std::vector<TExpr>& output_exprs,
+                      size_t block_size) {
+    _runtime_state.reset(new RuntimeState());
+    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), 
t_desc_tbl, &_desc_tbl));
+    _runtime_state->set_desc_tbl(_desc_tbl);
+    _block_pool.resize(block_size);
+    for (int i = 0; i < _block_pool.size(); ++i) {
+        _block_pool[i] = 
std::make_unique<vectorized::Block>(tuple_desc()->slots(), 10);
+    }
+
+    
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_runtime_state->obj_pool(),
 output_exprs,
+                                                         &_output_exprs_ctxs));
+    RowDescriptor row_desc(tuple_desc(), false);
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, 
_runtime_state.get(), row_desc));
+    _create_timestamp = butil::gettimeofday_ms();
+    return Status::OK();
+}
+
+std::unique_ptr<vectorized::Block> Reusable::get_block() {
+    std::lock_guard lock(_block_mutex);
+    if (_block_pool.empty()) {
+        return std::make_unique<vectorized::Block>(tuple_desc()->slots(), 4);

Review Comment:
   magic number



##########
be/src/runtime/descriptors.h:
##########
@@ -302,6 +307,13 @@ class JdbcTableDescriptor : public TableDescriptor {
 class TupleDescriptor {
 public:
     // virtual ~TupleDescriptor() {}
+    ~TupleDescriptor() {

Review Comment:
   If we add explicit dtor, we should add explicit move ctor & assignment.



##########
be/src/olap/tablet.cpp:
##########
@@ -1844,6 +1846,103 @@ TabletSchemaSPtr 
Tablet::get_max_version_schema(std::lock_guard<std::shared_mute
     return _max_version_schema;
 }
 
+RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) {
+    for (auto& version_rowset : _rs_version_map) {
+        if (version_rowset.second->rowset_id() == rowset_id) {
+            return version_rowset.second;
+        }
+    }
+    for (auto& stale_version_rowset : _stale_rs_version_map) {
+        if (stale_version_rowset.second->rowset_id() == rowset_id) {
+            return stale_version_rowset.second;
+        }
+    }
+    return nullptr;
+}
+
+Status Tablet::lookup_row_data(const RowLocation& row_location, const 
TupleDescriptor* desc,
+                               vectorized::Block* block) {
+    // read row data
+    BetaRowsetSharedPtr rowset =
+            
std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id));
+    if (!rowset) {
+        return Status::NotFound(
+                fmt::format("rowset {} not found", 
row_location.rowset_id.to_string()));
+    }
+
+    const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
+    SegmentCacheHandle segment_cache;
+    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true));
+    // find segment
+    auto it = std::find_if(segment_cache.get_segments().begin(), 
segment_cache.get_segments().end(),
+                           [&row_location](const segment_v2::SegmentSharedPtr& 
seg) {
+                               return seg->id() == row_location.segment_id;
+                           });
+    if (it == segment_cache.get_segments().end()) {
+        return Status::NotFound(fmt::format("rowset {} 's segemnt not found, 
seg_id {}",
+                                            row_location.rowset_id.to_string(),
+                                            row_location.segment_id));
+    }
+    // read from segment column by column, row by row
+    segment_v2::SegmentSharedPtr segment = *it;
+    size_t row_size = 0;
+    MonotonicStopWatch watch;
+    watch.start();
+    Defer _defer([&]() {
+        LOG_EVERY_N(INFO, 500) << "get a single_row, cost(us):" << 
watch.elapsed_time() / 1000
+                               << ", row_size:" << row_size;
+    });
+    // TODO(lhy) too long, refacor
+    if (tablet_schema->store_row_column()) {
+        // create _source column
+        segment_v2::ColumnIterator* column_iterator = nullptr;
+        RETURN_IF_ERROR(segment->new_row_column_iterator(&column_iterator));
+        std::unique_ptr<segment_v2::ColumnIterator> ptr_guard(column_iterator);
+        segment_v2::ColumnIteratorOptions opt;
+        OlapReaderStatistics stats;
+        opt.file_reader = segment->file_reader().get();
+        opt.stats = &stats;
+        opt.use_page_cache = !config::disable_storage_page_cache;
+        column_iterator->init(opt);
+        // get and parse tuple row
+        vectorized::MutableColumnPtr column_ptr =
+                vectorized::DataTypeFactory::instance()
+                        .create_data_type(TabletSchema::row_oriented_column())
+                        ->create_column();
+        std::vector<segment_v2::rowid_t> rowids {
+                static_cast<segment_v2::rowid_t>(row_location.row_id)};
+        RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, 
column_ptr));
+        assert(column_ptr->size() == 1);
+        auto string_column = 
static_cast<vectorized::ColumnString*>(column_ptr.get());
+        vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column, 
*block);
+        return Status::OK();
+    }
+    // read from segment column by column, row by row
+    for (int x = 0; x < desc->slots().size(); ++x) {
+        int index = 
tablet_schema->field_index(desc->slots()[x]->col_unique_id());
+        auto column = block->get_by_position(x).column->assume_mutable();
+        // TODO handle real default value

Review Comment:
   what's real default value?



##########
be/src/service/tablet_lookup_metric.cpp:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/tablet_lookup_metric.h"
+
+#include "olap/row_cursor.h"
+#include "olap/storage_engine.h"
+#include "service/internal_service.h"
+#include "util/defer_op.h"
+#include "util/key_util.h"
+#include "util/runtime_profile.h"
+#include "util/thrift_util.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/sink/vmysql_result_writer.cpp"
+
+namespace doris {
+
+Reusable::~Reusable() {
+    for (vectorized::VExprContext* ctx : _output_exprs_ctxs) {
+        ctx->close(_runtime_state.get());
+    }
+}
+
+Status Reusable::init(const TDescriptorTable& t_desc_tbl, const 
std::vector<TExpr>& output_exprs,
+                      size_t block_size) {
+    _runtime_state.reset(new RuntimeState());
+    RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), 
t_desc_tbl, &_desc_tbl));
+    _runtime_state->set_desc_tbl(_desc_tbl);
+    _block_pool.resize(block_size);
+    for (int i = 0; i < _block_pool.size(); ++i) {
+        _block_pool[i] = 
std::make_unique<vectorized::Block>(tuple_desc()->slots(), 10);

Review Comment:
   magic number 10



##########
be/src/runtime/datetime_value.h:
##########
@@ -588,11 +588,13 @@ class DateTimeValue {
 
     template <typename T>
     void convert_from_date_v2(doris::vectorized::DateV2Value<T>* dt) {
+        this->_microsecond = 0;

Review Comment:
   Is it a bug of convert_from_date_v2?



##########
be/src/olap/tablet.cpp:
##########
@@ -1844,6 +1846,103 @@ TabletSchemaSPtr 
Tablet::get_max_version_schema(std::lock_guard<std::shared_mute
     return _max_version_schema;
 }
 
+RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) {
+    for (auto& version_rowset : _rs_version_map) {
+        if (version_rowset.second->rowset_id() == rowset_id) {
+            return version_rowset.second;
+        }
+    }
+    for (auto& stale_version_rowset : _stale_rs_version_map) {
+        if (stale_version_rowset.second->rowset_id() == rowset_id) {
+            return stale_version_rowset.second;
+        }
+    }
+    return nullptr;
+}
+
+Status Tablet::lookup_row_data(const RowLocation& row_location, const 
TupleDescriptor* desc,
+                               vectorized::Block* block) {
+    // read row data
+    BetaRowsetSharedPtr rowset =
+            
std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id));
+    if (!rowset) {
+        return Status::NotFound(
+                fmt::format("rowset {} not found", 
row_location.rowset_id.to_string()));
+    }
+
+    const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
+    SegmentCacheHandle segment_cache;
+    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true));
+    // find segment
+    auto it = std::find_if(segment_cache.get_segments().begin(), 
segment_cache.get_segments().end(),
+                           [&row_location](const segment_v2::SegmentSharedPtr& 
seg) {
+                               return seg->id() == row_location.segment_id;
+                           });
+    if (it == segment_cache.get_segments().end()) {
+        return Status::NotFound(fmt::format("rowset {} 's segemnt not found, 
seg_id {}",
+                                            row_location.rowset_id.to_string(),
+                                            row_location.segment_id));
+    }
+    // read from segment column by column, row by row
+    segment_v2::SegmentSharedPtr segment = *it;
+    size_t row_size = 0;
+    MonotonicStopWatch watch;
+    watch.start();
+    Defer _defer([&]() {
+        LOG_EVERY_N(INFO, 500) << "get a single_row, cost(us):" << 
watch.elapsed_time() / 1000
+                               << ", row_size:" << row_size;
+    });
+    // TODO(lhy) too long, refacor
+    if (tablet_schema->store_row_column()) {
+        // create _source column
+        segment_v2::ColumnIterator* column_iterator = nullptr;
+        RETURN_IF_ERROR(segment->new_row_column_iterator(&column_iterator));
+        std::unique_ptr<segment_v2::ColumnIterator> ptr_guard(column_iterator);
+        segment_v2::ColumnIteratorOptions opt;
+        OlapReaderStatistics stats;
+        opt.file_reader = segment->file_reader().get();
+        opt.stats = &stats;
+        opt.use_page_cache = !config::disable_storage_page_cache;
+        column_iterator->init(opt);
+        // get and parse tuple row
+        vectorized::MutableColumnPtr column_ptr =
+                vectorized::DataTypeFactory::instance()
+                        .create_data_type(TabletSchema::row_oriented_column())
+                        ->create_column();
+        std::vector<segment_v2::rowid_t> rowids {
+                static_cast<segment_v2::rowid_t>(row_location.row_id)};
+        RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, 
column_ptr));
+        assert(column_ptr->size() == 1);
+        auto string_column = 
static_cast<vectorized::ColumnString*>(column_ptr.get());
+        vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column, 
*block);
+        return Status::OK();
+    }
+    // read from segment column by column, row by row
+    for (int x = 0; x < desc->slots().size(); ++x) {

Review Comment:
   x -> i



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to