This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 23e760a0a9036ad3335dfc08278f409eeb71873e
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Mon Jun 5 22:10:08 2023 +0800

    [feature-wip](multi-catalog)(step2)support read max compute data by JNI 
(#19819)
    
    Issue Number: #19679
---
 be/src/runtime/descriptors.cpp                     |  23 +-
 be/src/runtime/descriptors.h                       |  23 ++
 be/src/vec/CMakeLists.txt                          |   1 +
 be/src/vec/exec/jni_connector.cpp                  |  47 ++--
 be/src/vec/exec/jni_connector.h                    |   1 -
 be/src/vec/exec/scan/max_compute_jni_reader.cpp    | 100 ++++++++
 be/src/vec/exec/scan/max_compute_jni_reader.h      |  80 +++++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  14 ++
 fe/fe-core/pom.xml                                 |   1 -
 .../catalog/external/MaxComputeExternalTable.java  |  25 +-
 .../apache/doris/datasource/ExternalCatalog.java   |  15 +-
 .../datasource/MaxComputeExternalCatalog.java      |  61 ++++-
 .../property/constants/MCProperties.java           |   1 +
 .../doris/planner/external/FileQueryScanNode.java  |  11 +-
 .../doris/planner/external/MaxComputeScanNode.java |  48 +++-
 fe/java-udf/pom.xml                                |  39 +++-
 .../org/apache/doris/hudi/HudiColumnValue.java     |   5 +
 .../main/java/org/apache/doris/jni/JniScanner.java |   5 +-
 .../org/apache/doris/jni/MaxComputeJniScanner.java | 251 +++++++++++++++++++++
 .../java/org/apache/doris/jni/MockJniScanner.java  |   5 +
 .../java/org/apache/doris/jni/vec/ColumnValue.java |  34 +--
 .../doris/jni/vec/MaxComputeColumnValue.java       | 185 +++++++++++++++
 .../org/apache/doris/jni/vec/ScanPredicate.java    |   5 +
 .../org/apache/doris/jni/vec/VectorColumn.java     |   2 +-
 fe/pom.xml                                         |  19 +-
 gensrc/thrift/Descriptors.thrift                   |   6 +-
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 gensrc/thrift/Types.thrift                         |   1 +
 28 files changed, 931 insertions(+), 78 deletions(-)

diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index f4cb3f6dff..a75d3d0e71 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -117,7 +117,8 @@ std::string SlotDescriptor::debug_string() const {
 }
 
 TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc)
-        : _name(tdesc.tableName),
+        : _table_type(tdesc.tableType),
+          _name(tdesc.tableName),
           _database(tdesc.dbName),
           _table_id(tdesc.id),
           _num_cols(tdesc.numCols),
@@ -179,6 +180,23 @@ std::string IcebergTableDescriptor::debug_string() const {
     return out.str();
 }
 
+MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& 
tdesc)
+        : TableDescriptor(tdesc),
+          _region(tdesc.mcTable.region),
+          _project(tdesc.mcTable.project),
+          _table(tdesc.mcTable.table),
+          _access_key(tdesc.mcTable.access_key),
+          _secret_key(tdesc.mcTable.secret_key),
+          _public_access(tdesc.mcTable.public_access) {}
+
+MaxComputeTableDescriptor::~MaxComputeTableDescriptor() {}
+
+std::string MaxComputeTableDescriptor::debug_string() const {
+    std::stringstream out;
+    out << "MaxComputeTable(" << TableDescriptor::debug_string() << ")";
+    return out.str();
+}
+
 EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : 
TableDescriptor(tdesc) {}
 
 EsTableDescriptor::~EsTableDescriptor() {}
@@ -573,6 +591,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const 
TDescriptorTable& thrift_tb
         case TTableType::JDBC_TABLE:
             desc = pool->add(new JdbcTableDescriptor(tdesc));
             break;
+        case TTableType::MAX_COMPUTE_TABLE:
+            desc = pool->add(new MaxComputeTableDescriptor(tdesc));
+            break;
         default:
             DCHECK(false) << "invalid table type: " << tdesc.tableType;
         }
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 48ea79d879..cdb20f0fb2 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -162,11 +162,13 @@ public:
         return slot_desc->col_pos() < _num_clustering_cols;
     }
 
+    ::doris::TTableType::type table_type() const { return _table_type; }
     const std::string& name() const { return _name; }
     const std::string& database() const { return _database; }
     int32_t table_id() const { return _table_id; }
 
 private:
+    ::doris::TTableType::type _table_type;
     std::string _name;
     std::string _database;
     int32_t _table_id;
@@ -218,6 +220,27 @@ public:
 private:
 };
 
+class MaxComputeTableDescriptor : public TableDescriptor {
+public:
+    MaxComputeTableDescriptor(const TTableDescriptor& tdesc);
+    ~MaxComputeTableDescriptor() override;
+    std::string debug_string() const override;
+    const std::string region() const { return _region; }
+    const std::string project() const { return _project; }
+    const std::string table() const { return _table; }
+    const std::string access_key() const { return _access_key; }
+    const std::string secret_key() const { return _secret_key; }
+    const std::string public_access() const { return _public_access; }
+
+private:
+    std::string _region;
+    std::string _project;
+    std::string _table;
+    std::string _access_key;
+    std::string _secret_key;
+    std::string _public_access;
+};
+
 class EsTableDescriptor : public TableDescriptor {
 public:
     EsTableDescriptor(const TTableDescriptor& tdesc);
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 725c4c88bc..2bf3f245a1 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -352,6 +352,7 @@ set(VEC_FILES
   exec/format/parquet/bool_rle_decoder.cpp
   exec/jni_connector.cpp
   exec/scan/jni_reader.cpp
+  exec/scan/max_compute_jni_reader.cpp
   )
 
 if (WITH_MYSQL)
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index fcddbad134..fd5cf52e2b 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -63,14 +63,17 @@ JniConnector::~JniConnector() {
 }
 
 Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
-    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&_env));
-    if (_env == nullptr) {
+    // cannot put the env into fields, because frames in an env object is 
limited
+    // to avoid limited frames in a thread, we should get local env in a 
method instead of in whole object.
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    if (env == nullptr) {
         return Status::InternalError("Failed to get/create JVM");
     }
-    RETURN_IF_ERROR(_init_jni_scanner(_env, state->batch_size()));
+    RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size()));
     // Call org.apache.doris.jni.JniScanner#open
-    _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
-    RETURN_ERROR_IF_EXC(_env);
+    env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
+    RETURN_ERROR_IF_EXC(env);
     return Status::OK();
 }
 
@@ -87,12 +90,12 @@ Status JniConnector::init(
 }
 
 Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) 
{
-    JniLocalFrame jni_frame;
-    RETURN_IF_ERROR(jni_frame.push(_env));
     // Call org.apache.doris.jni.JniScanner#getNextBatchMeta
     // return the address of meta information
-    long meta_address = _env->CallLongMethod(_jni_scanner_obj, 
_jni_scanner_get_next_batch);
-    RETURN_ERROR_IF_EXC(_env);
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    long meta_address = env->CallLongMethod(_jni_scanner_obj, 
_jni_scanner_get_next_batch);
+    RETURN_ERROR_IF_EXC(env);
     if (meta_address == 0) {
         // Address == 0 when there's no data in scanner
         *read_rows = 0;
@@ -109,25 +112,27 @@ Status JniConnector::get_nex_block(Block* block, size_t* 
read_rows, bool* eof) {
     RETURN_IF_ERROR(_fill_block(block, num_rows));
     *read_rows = num_rows;
     *eof = false;
-    _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
-    RETURN_ERROR_IF_EXC(_env);
+    env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
+    RETURN_ERROR_IF_EXC(env);
     _has_read += num_rows;
     return Status::OK();
 }
 
 Status JniConnector::close() {
     if (!_closed) {
+        JNIEnv* env = nullptr;
+        RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         // _fill_block may be failed and returned, we should release table in 
close.
         // org.apache.doris.jni.JniScanner#releaseTable is idempotent
-        _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
-        _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
-        _env->DeleteLocalRef(_jni_scanner_obj);
-        _env->DeleteLocalRef(_jni_scanner_cls);
+        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
+        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close);
+        env->DeleteGlobalRef(_jni_scanner_obj);
+        env->DeleteGlobalRef(_jni_scanner_cls);
         _closed = true;
-        jthrowable exc = (_env)->ExceptionOccurred();
+        jthrowable exc = (env)->ExceptionOccurred();
         if (exc != nullptr) {
             LOG(FATAL) << "Failed to release jni resource: "
-                       << JniUtil::GetJniExceptionMsg(_env).to_string();
+                       << JniUtil::GetJniExceptionMsg(env).to_string();
         }
     }
     return Status::OK();
@@ -170,7 +175,7 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int 
batch_size) {
     RETURN_ERROR_IF_EXC(env);
     _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, 
"releaseTable", "()V");
     RETURN_ERROR_IF_EXC(env);
-
+    RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _jni_scanner_obj, 
&_jni_scanner_obj));
     return Status::OK();
 }
 
@@ -180,9 +185,11 @@ Status JniConnector::_fill_block(Block* block, size_t 
num_rows) {
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
         RETURN_IF_ERROR(_fill_column(column_ptr, column_type, num_rows));
+        JNIEnv* env = nullptr;
+        RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         // Column is not released when _fill_column failed. It will be 
released when releasing table.
-        _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i);
-        RETURN_ERROR_IF_EXC(_env);
+        env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i);
+        RETURN_ERROR_IF_EXC(env);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 14915ac533..90d5fbcd0a 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -227,7 +227,6 @@ private:
 
     long* _meta_ptr;
     int _meta_index;
-    JNIEnv* _env = nullptr;
 
     int _predicates_length = 0;
     std::unique_ptr<char[]> _predicates = nullptr;
diff --git a/be/src/vec/exec/scan/max_compute_jni_reader.cpp 
b/be/src/vec/exec/scan/max_compute_jni_reader.cpp
new file mode 100644
index 0000000000..f5182931d1
--- /dev/null
+++ b/be/src/vec/exec/scan/max_compute_jni_reader.cpp
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "max_compute_jni_reader.h"
+
+#include <glog/logging.h>
+
+#include <map>
+#include <ostream>
+
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* 
mc_desc,
+                                         const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                         const TFileRangeDesc& range, 
RuntimeState* state,
+                                         RuntimeProfile* profile)
+        : _file_slot_descs(file_slot_descs), _range(range), _state(state), 
_profile(profile) {
+    _table_desc = mc_desc;
+    std::ostringstream required_fields;
+    std::ostringstream columns_types;
+    std::vector<std::string> column_names;
+    int index = 0;
+    for (auto& desc : _file_slot_descs) {
+        std::string field = desc->col_name();
+        std::string type = JniConnector::get_hive_type(desc->type());
+        column_names.emplace_back(field);
+        if (index == 0) {
+            required_fields << field;
+            columns_types << type;
+        } else {
+            required_fields << "," << field;
+            columns_types << "#" << type;
+        }
+        index++;
+    }
+    std::map<String, String> params = {{"region", _table_desc->region()},
+                                       {"access_key", 
_table_desc->access_key()},
+                                       {"secret_key", 
_table_desc->secret_key()},
+                                       {"project", _table_desc->project()},
+                                       {"table", _table_desc->table()},
+                                       {"public_access", 
_table_desc->public_access()},
+                                       {"start_offset", 
std::to_string(_range.start_offset)},
+                                       {"split_size", 
std::to_string(_range.size)},
+                                       {"required_fields", 
required_fields.str()},
+                                       {"columns_types", columns_types.str()}};
+    _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/jni/MaxComputeJniScanner",
+                                                    params, column_names);
+}
+
+Status MaxComputeJniReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status MaxComputeJniReader::get_columns(
+        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+        std::unordered_set<std::string>* missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+Status MaxComputeJniReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/max_compute_jni_reader.h 
b/be/src/vec/exec/scan/max_compute_jni_reader.h
new file mode 100644
index 0000000000..0b3c809c50
--- /dev/null
+++ b/be/src/vec/exec/scan/max_compute_jni_reader.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "runtime/descriptors.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ * The demo usage of JniReader, showing how to read data from java scanner.
+ * The java side is also a mock reader that provide values for each type.
+ * This class will only be retained during the functional testing phase to 
verify that
+ * the communication and data exchange with the jvm are correct.
+ */
+class MaxComputeJniReader : public GenericReader {
+    ENABLE_FACTORY_CREATOR(MaxComputeJniReader);
+
+public:
+    MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc,
+                        const std::vector<SlotDescriptor*>& file_slot_descs,
+                        const TFileRangeDesc& range, RuntimeState* state, 
RuntimeProfile* profile);
+
+    ~MaxComputeJniReader() override = default;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
+
+private:
+    const MaxComputeTableDescriptor* _table_desc;
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    const TFileRangeDesc& _range;
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
+    std::unique_ptr<JniConnector> _jni_connector;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a1a15e61e2..b7f8119553 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -60,6 +60,7 @@
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/iceberg_reader.h"
+#include "vec/exec/scan/max_compute_jni_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vexpr.h"
@@ -588,6 +589,19 @@ Status VFileScanner::_get_next_reader() {
         Status init_status;
         // TODO: use data lake type
         switch (_params.format_type) {
+        case TFileFormatType::FORMAT_JNI: {
+            if (_real_tuple_desc->table_desc()->table_type() ==
+                ::doris::TTableType::type::MAX_COMPUTE_TABLE) {
+                const MaxComputeTableDescriptor* mc_desc =
+                        static_cast<const MaxComputeTableDescriptor*>(
+                                _real_tuple_desc->table_desc());
+                std::unique_ptr<MaxComputeJniReader> mc_reader = 
MaxComputeJniReader::create_unique(
+                        mc_desc, _file_slot_descs, range, _state, _profile);
+                init_status = mc_reader->init_reader(_colname_to_value_range);
+                _cur_reader = std::move(mc_reader);
+            }
+            break;
+        }
         case TFileFormatType::FORMAT_PARQUET: {
             std::unique_ptr<ParquetReader> parquet_reader = 
ParquetReader::create_unique(
                     _profile, _params, range, 
_state->query_options().batch_size,
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 63f6b82143..4bce0193b4 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -410,7 +410,6 @@ under the License.
         <dependency>
             <groupId>com.aliyun.odps</groupId>
             <artifactId>odps-sdk-core</artifactId>
-            <version>0.43.3-public</version>
         </dependency>
         <!-- 
https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web
 -->
         <dependency>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
index 5de781a686..012693bccd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -32,10 +32,12 @@ import org.apache.doris.thrift.TTableType;
 import com.aliyun.odps.OdpsType;
 import com.aliyun.odps.Table;
 import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.CharTypeInfo;
 import com.aliyun.odps.type.DecimalTypeInfo;
 import com.aliyun.odps.type.MapTypeInfo;
 import com.aliyun.odps.type.StructTypeInfo;
 import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.VarcharTypeInfo;
 import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
@@ -95,13 +97,15 @@ public class MaxComputeExternalTable extends ExternalTable {
                 return Type.BIGINT;
             }
             case CHAR: {
-                return Type.CHAR;
-            }
-            case VARCHAR: {
-                return Type.VARCHAR;
+                CharTypeInfo charType = (CharTypeInfo) typeInfo;
+                return ScalarType.createChar(charType.getLength());
             }
             case STRING: {
-                return Type.STRING;
+                return ScalarType.createStringType();
+            }
+            case VARCHAR: {
+                VarcharTypeInfo varcharType = (VarcharTypeInfo) typeInfo;
+                return ScalarType.createVarchar(varcharType.getLength());
             }
             case JSON: {
                 return Type.UNSUPPORTED;
@@ -158,7 +162,11 @@ public class MaxComputeExternalTable extends ExternalTable 
{
     public TTableDescriptor toThrift() {
         List<Column> schema = getFullSchema();
         TMCTable tMcTable = new TMCTable();
-        tMcTable.setTunnelUrl(((MaxComputeExternalCatalog) 
catalog).getTunnelUrl());
+        MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) 
catalog;
+        tMcTable.setRegion(mcCatalog.getRegion());
+        tMcTable.setAccessKey(mcCatalog.getAccessKey());
+        tMcTable.setSecretKey(mcCatalog.getSecretKey());
+        
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
         // use mc project as dbName
         tMcTable.setProject(dbName);
         tMcTable.setTable(name);
@@ -168,9 +176,14 @@ public class MaxComputeExternalTable extends ExternalTable 
{
         return tTableDescriptor;
     }
 
+    public Table getOdpsTable() {
+        return odpsTable;
+    }
+
     @Override
     public String getMysqlType() {
         return "BASE TABLE";
     }
+
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index ae070bf507..a23842c9bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -107,15 +107,6 @@ public abstract class ExternalCatalog
         // set some default properties when creating catalog
     }
 
-    /**
-     * @return names of database in this catalog.
-     */
-    // public abstract List<String> listDatabaseNames(SessionContext ctx);
-    public List<String> listDatabaseNames(SessionContext ctx) {
-        makeSureInitialized();
-        return new ArrayList<>(dbNameToId.keySet());
-    }
-
     /**
      * @param dbName
      * @return names of tables in specified database
@@ -315,9 +306,13 @@ public abstract class ExternalCatalog
         this.comment = comment;
     }
 
+    /**
+     * @return names of database in this catalog.
+     */
     @Override
     public List<String> getDbNames() {
-        return listDatabaseNames(null);
+        makeSureInitialized();
+        return new ArrayList<>(dbNameToId.keySet());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
index d3f77a985f..c62abf9ada 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -24,7 +24,10 @@ import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsException;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
 import com.google.common.base.Strings;
+import com.google.gson.annotations.SerializedName;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,9 +35,16 @@ import java.util.Map;
 
 public class MaxComputeExternalCatalog extends ExternalCatalog {
     private Odps odps;
-    private String tunnelUrl;
+    @SerializedName(value = "region")
+    private String region;
+    @SerializedName(value = "accessKey")
+    private String accessKey;
+    @SerializedName(value = "secretKey")
+    private String secretKey;
+    @SerializedName(value = "publicAccess")
+    private boolean enablePublicAccess;
     private static final String odpsUrlTemplate = 
"http://service.{}.maxcompute.aliyun.com/api";;
-    private static final String tunnelUrlTemplate = 
"http://dt.{}.maxcompute.aliyun.com";;
+    private static final String tunnelUrlTemplate = 
"http://dt.{}.maxcompute.aliyun-inc.com";;
 
     public MaxComputeExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props,
             String comment) {
@@ -57,12 +67,30 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
             // may use oss-cn-beijing, ensure compatible
             region = region.replace("oss-", "");
         }
-        this.tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        this.region = region;
         CloudCredential credential = MCProperties.getCredential(props);
-        Account account = new AliyunAccount(credential.getAccessKey(), 
credential.getSecretKey());
+        if (!credential.isWhole()) {
+            throw new IllegalArgumentException("Max-Compute credential 
properties '"
+                    + MCProperties.ACCESS_KEY + "' and  '" + 
MCProperties.SECRET_KEY + "' are required.");
+        }
+        accessKey = credential.getAccessKey();
+        secretKey = credential.getSecretKey();
+        Account account = new AliyunAccount(accessKey, secretKey);
         this.odps = new Odps(account);
         odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
         odps.setDefaultProject(defaultProject);
+        enablePublicAccess = 
Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false"));
+    }
+
+    public long getTotalRows(String project, String table) throws 
TunnelException {
+        makeSureInitialized();
+        TableTunnel tunnel = new TableTunnel(odps);
+        String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        if (enablePublicAccess) {
+            tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
+        }
+        tunnel.setEndpoint(tunnelUrl);
+        return tunnel.createDownloadSession(project, table).getRecordCount();
     }
 
     public Odps getClient() {
@@ -73,6 +101,8 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
     protected List<String> listDatabaseNames() {
         List<String> result = new ArrayList<>();
         try {
+            // TODO: How to get all privileged project from max compute as 
databases?
+            // Now only have permission to show default project.
             
result.add(odps.projects().get(odps.getDefaultProject()).getName());
         } catch (OdpsException e) {
             throw new RuntimeException(e);
@@ -99,11 +129,26 @@ public class MaxComputeExternalCatalog extends 
ExternalCatalog {
     }
 
     /**
-     * data tunnel url
-     * @return tunnelUrl, required by jni scanner.
+     * use region to create data tunnel url
+     * @return region, required by jni scanner.
      */
-    public String getTunnelUrl() {
+    public String getRegion() {
+        makeSureInitialized();
+        return region;
+    }
+
+    public String getAccessKey() {
+        makeSureInitialized();
+        return accessKey;
+    }
+
+    public String getSecretKey() {
+        makeSureInitialized();
+        return secretKey;
+    }
+
+    public boolean enablePublicAccess() {
         makeSureInitialized();
-        return tunnelUrl;
+        return enablePublicAccess;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
index 0fb4274049..32c8534ace 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
@@ -30,6 +30,7 @@ public class MCProperties extends BaseProperties {
     public static final String ACCESS_KEY = "mc.access_key";
     public static final String SECRET_KEY = "mc.secret_key";
     public static final String SESSION_TOKEN = "mc.session_token";
+    public static final String PUBLIC_ACCESS = "mc.public_access";
 
     public static CloudCredential getCredential(Map<String, String> props) {
         return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, 
SESSION_TOKEN);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index b07a48da2b..508b174fb5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -194,7 +194,11 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             throws UserException {
         TableIf tbl = getTargetTable();
         List<Integer> columnIdxs = Lists.newArrayList();
-
+        // avoid null pointer, it maybe has no slots when two tables are joined
+        if (params.getRequiredSlots() == null) {
+            params.setColumnIdxs(columnIdxs);
+            return;
+        }
         for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
             if (!slot.isIsFileSlot()) {
                 continue;
@@ -273,6 +277,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
             // external data lake table
             if (fileSplit instanceof IcebergSplit) {
+                // TODO: extract all data lake split to factory
                 IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) 
fileSplit);
             }
 
@@ -329,7 +334,9 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
         if (getLocationType() == TFileType.FILE_HDFS) {
             rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
-        } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() 
== TFileType.FILE_BROKER) {
+        } else if (getLocationType() == TFileType.FILE_S3
+                || getLocationType() == TFileType.FILE_BROKER
+                || getLocationType() == TFileType.FILE_NET) {
             // need full path
             rangeDesc.setPath(fileSplit.getPath().toString());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index 367576ba6c..102292e4c0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -20,13 +20,16 @@ package org.apache.doris.planner.external;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 
+import com.aliyun.odps.tunnel.TunnelException;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -38,22 +41,24 @@ import java.util.Map;
 public class MaxComputeScanNode extends FileQueryScanNode {
 
     private final MaxComputeExternalTable table;
+    private final MaxComputeExternalCatalog catalog;
+    public static final int MIN_SPLIT_SIZE = 4096;
 
     public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
                               StatisticalType statisticalType, boolean 
needCheckColumnPriv) {
         super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
         table = (MaxComputeExternalTable) desc.getTable();
+        catalog = (MaxComputeExternalCatalog) table.getCatalog();
     }
 
     @Override
     protected TFileType getLocationType() throws UserException {
-        return TFileType.FILE_STREAM;
+        return TFileType.FILE_NET;
     }
 
     @Override
     public TFileFormatType getFileFormatType() {
-        // TODO: use max compute format
-        return TFileFormatType.FORMAT_PARQUET;
+        return TFileFormatType.FORMAT_JNI;
     }
 
     @Override
@@ -74,7 +79,42 @@ public class MaxComputeScanNode extends FileQueryScanNode {
     @Override
     protected List<Split> getSplits() throws UserException {
         List<Split> result = new ArrayList<>();
-        result.add(new FileSplit(new Path("/"), 0, -1, -1, 0L, new String[0], 
Collections.emptyList()));
+        // String splitPath = catalog.getTunnelUrl();
+        // TODO: use single max compute scan node rather than file scan node
+        com.aliyun.odps.Table odpsTable = table.getOdpsTable();
+        if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
+            return result;
+        }
+        try {
+            List<Pair<Long, Long>> sliceRange = new ArrayList<>();
+            long totalRows = catalog.getTotalRows(table.getDbName(), 
table.getName());
+            long fileNum = odpsTable.getFileNum();
+            long start = 0;
+            long splitSize = (long) Math.ceil((double) totalRows / fileNum);
+            if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
+                // use whole split
+                sliceRange.add(Pair.of(start, totalRows));
+            } else {
+                for (int i = 0; i < fileNum; i++) {
+                    if (start > totalRows) {
+                        break;
+                    }
+                    sliceRange.add(Pair.of(start, splitSize));
+                    start += splitSize;
+                }
+            }
+            long modificationTime = 
odpsTable.getLastDataModifiedTime().getTime();
+            if (!sliceRange.isEmpty()) {
+                for (int i = 0; i < sliceRange.size(); i++) {
+                    Pair<Long, Long> range = sliceRange.get(i);
+                    result.add(new FileSplit(new Path("/virtual_slice_" + i), 
range.first, range.second,
+                            totalRows, modificationTime, null, 
Collections.emptyList()));
+                }
+            }
+        } catch (TunnelException e) {
+            throw new UserException("Max Compute tunnel SDK exception.", e);
+
+        }
         return result;
     }
 }
diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml
index 0242bc2370..66d0123795 100644
--- a/fe/java-udf/pom.xml
+++ b/fe/java-udf/pom.xml
@@ -50,10 +50,6 @@ under the License.
             <artifactId>fe-common</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>hive-catalog-shade</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
@@ -81,6 +77,36 @@ under the License.
             <artifactId>clickhouse-jdbc</artifactId>
             <classifier>all</classifier>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-vector</artifactId>
+            <version>9.0.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-unsafe</artifactId>
+            <version>9.0.0</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-presto-bundle</artifactId>
@@ -136,7 +162,10 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>hive-catalog-shade</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <finalName>java-udf</finalName>
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java 
b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
index 3a5d4059b2..11fbd0558c 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java
@@ -41,6 +41,11 @@ public class HudiColumnValue implements ColumnValue {
         return ((PrimitiveObjectInspector) 
fieldInspector).getPrimitiveJavaObject(fieldData);
     }
 
+    @Override
+    public boolean isNull() {
+        return false;
+    }
+
     @Override
     public boolean getBoolean() {
         return (boolean) inspectObject();
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java
index 3f1f1df462..3d0223ba75 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java
@@ -72,6 +72,7 @@ public abstract class JniScanner {
             throw e;
         }
         if (numRows == 0) {
+            releaseTable();
             return 0;
         }
         return getMetaAddress(numRows);
@@ -83,7 +84,9 @@ public abstract class JniScanner {
     }
 
     protected void resetTable() {
-        vectorTable.reset();
+        if (vectorTable != null) {
+            vectorTable.reset();
+        }
     }
 
     protected void releaseColumn(int fieldId) {
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java
new file mode 100644
index 0000000000..65aa2ccded
--- /dev/null
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni;
+
+import org.apache.doris.jni.vec.ColumnType;
+import org.apache.doris.jni.vec.MaxComputeColumnValue;
+import org.apache.doris.jni.vec.ScanPredicate;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.ArrowRecordReader;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.google.common.base.Strings;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * MaxComputeJ JniScanner. BE will read data from the scanner object.
+ */
+public class MaxComputeJniScanner extends JniScanner {
+    private Odps odps;
+    private TableTunnel tunnel;
+
+    private static final Logger LOG = 
Logger.getLogger(MaxComputeJniScanner.class);
+    private static final String odpsUrlTemplate = 
"http://service.{}.maxcompute.aliyun.com/api";;
+    private static final String tunnelUrlTemplate = 
"http://dt.{}.maxcompute.aliyun-inc.com";;
+    private static final String REGION = "region";
+    private static final String PROJECT = "project";
+    private static final String TABLE = "table";
+    private static final String ACCESS_KEY = "access_key";
+    private static final String SECRET_KEY = "secret_key";
+    private static final String START_OFFSET = "start_offset";
+    private static final String SPLIT_SIZE = "split_size";
+    private static final String PUBLIC_ACCESS = "public_access";
+    private final String project;
+    private final String table;
+    private MaxComputeColumnValue columnValue;
+    private long remainBatchRows = 0;
+    private long totalRows = 0;
+    private TableTunnel.DownloadSession session;
+    private ArrowRecordReader curReader;
+    private List<Column> columns;
+    private Map<String, Integer> readColumnsId;
+    private long startOffset = -1L;
+    private long splitSize = -1L;
+
+    public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
+        String region = Objects.requireNonNull(params.get(REGION), "required 
property '" + REGION + "'.");
+        project = Objects.requireNonNull(params.get(PROJECT), "required 
property '" + PROJECT + "'.");
+        table = Objects.requireNonNull(params.get(TABLE), "required property 
'" + TABLE + "'.");
+        if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
+                && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
+            startOffset = Long.parseLong(params.get(START_OFFSET));
+            splitSize = Long.parseLong(params.get(SPLIT_SIZE));
+        }
+        String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), 
"required property '" + ACCESS_KEY + "'.");
+        String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), 
"required property '" + SECRET_KEY + "'.");
+        odps = new Odps(new AliyunAccount(accessKey, secretKey));
+        odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
+        odps.setDefaultProject(project);
+        tunnel = new TableTunnel(odps);
+        String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+        boolean enablePublicAccess = 
Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false"));
+        if (enablePublicAccess) {
+            tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
+        }
+        tunnel.setEndpoint(tunnelUrl);
+        String[] requiredFields = params.get("required_fields").split(",");
+        String[] types = params.get("columns_types").split("#");
+        ColumnType[] columnTypes = new ColumnType[types.length];
+        for (int i = 0; i < types.length; i++) {
+            columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]);
+        }
+        ScanPredicate[] predicates = new ScanPredicate[0];
+        if (params.containsKey("push_down_predicates")) {
+            long predicatesAddress = 
Long.parseLong(params.get("push_down_predicates"));
+            if (predicatesAddress != 0) {
+                predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("MaxComputeJniScanner gets pushed-down predicates:  " 
+ ScanPredicate.dump(predicates));
+            }
+        }
+        initTableInfo(columnTypes, requiredFields, predicates, batchSize);
+    }
+
+    @Override
+    protected void initTableInfo(ColumnType[] requiredTypes, String[] 
requiredFields, ScanPredicate[] predicates,
+                                 int batchSize) {
+        super.initTableInfo(requiredTypes, requiredFields, predicates, 
batchSize);
+        columns = new ArrayList<>();
+        readColumnsId = new HashMap<>();
+        for (int i = 0; i < fields.length; i++) {
+            if (!Strings.isNullOrEmpty(fields[i])) {
+                columns.add(createOdpsColumn(i, types[i]));
+                readColumnsId.put(fields[i], i);
+            }
+        }
+        // reorder columns
+        List<Column> columnList = 
odps.tables().get(table).getSchema().getColumns();
+        Map<String, Integer> columnRank = new HashMap<>();
+        for (int i = 0; i < columnList.size(); i++) {
+            columnRank.put(columnList.get(i).getName(), i);
+        }
+        // Downloading columns data from Max compute only supports the order 
of table metadata.
+        // We might get an error message if no sort here: Column reorder is 
not supported in legacy arrow mode.
+        columns.sort((Comparator.comparing(o -> columnRank.get(o.getName()))));
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (columns.isEmpty()) {
+            return;
+        }
+        try {
+            session = tunnel.createDownloadSession(project, table);
+            if (splitSize > 0) {
+                totalRows = Math.min(splitSize, session.getRecordCount());
+            } else {
+                totalRows = session.getRecordCount();
+            }
+            long start = startOffset == -1L ? 0 : startOffset;
+            curReader = session.openArrowRecordReader(start, totalRows, 
columns);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        remainBatchRows = totalRows;
+    }
+
+    private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
+        TypeInfo odpsType;
+        switch (dorisType.getType()) {
+            case BOOLEAN:
+                odpsType = TypeInfoFactory.BOOLEAN;
+                break;
+            case TINYINT:
+                odpsType = TypeInfoFactory.TINYINT;
+                break;
+            case SMALLINT:
+                odpsType = TypeInfoFactory.SMALLINT;
+                break;
+            case INT:
+                odpsType = TypeInfoFactory.INT;
+                break;
+            case BIGINT:
+                odpsType = TypeInfoFactory.BIGINT;
+                break;
+            case DECIMAL32:
+            case DECIMAL64:
+            case DECIMAL128:
+            case DECIMALV2:
+                odpsType = 
TypeInfoFactory.getDecimalTypeInfo(dorisType.getPrecision(), 
dorisType.getScale());
+                break;
+            case FLOAT:
+                odpsType = TypeInfoFactory.FLOAT;
+                break;
+            case DOUBLE:
+                odpsType = TypeInfoFactory.DOUBLE;
+                break;
+            case DATETIMEV2:
+                odpsType = TypeInfoFactory.DATETIME;
+                break;
+            case DATEV2:
+                odpsType = TypeInfoFactory.DATE;
+                break;
+            case CHAR:
+                odpsType = 
TypeInfoFactory.getCharTypeInfo(dorisType.getLength());
+                break;
+            case VARCHAR:
+                odpsType = 
TypeInfoFactory.getVarcharTypeInfo(dorisType.getLength());
+                break;
+            case STRING:
+                odpsType = 
TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
+                break;
+            default:
+                throw new RuntimeException("Unsupported transform for column 
type: " + dorisType.getType());
+        }
+        return new Column(fields[colIdx], odpsType);
+    }
+
+    @Override
+    public void close() throws IOException {
+        remainBatchRows = 0;
+        totalRows = 0;
+        startOffset = -1;
+        splitSize = -1;
+        if (curReader != null) {
+            curReader.close();
+        }
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        if (curReader == null) {
+            return 0;
+        }
+        columnValue = new MaxComputeColumnValue();
+        int expectedRows = (int) Math.min(batchSize, remainBatchRows);
+        int realRows = readVectors(expectedRows);
+        if (remainBatchRows <= 0) {
+            return 0;
+        }
+        remainBatchRows -= realRows;
+        return realRows;
+    }
+
+    private int readVectors(int expectedRows) throws IOException {
+        VectorSchemaRoot batch;
+        int curReadRows = 0;
+        while (curReadRows < expectedRows && (batch = curReader.read()) != 
null) {
+            List<FieldVector> fieldVectors = batch.getFieldVectors();
+            int batchRows = 0;
+            for (FieldVector column : fieldVectors) {
+                columnValue.reset(column);
+                // LOG.warn("MCJNI read getClass: " + column.getClass());
+                batchRows = column.getValueCount();
+                for (int j = 0; j < batchRows; j++) {
+                    appendData(readColumnsId.get(column.getName()), 
columnValue);
+                }
+            }
+            curReadRows += batchRows;
+        }
+        return curReadRows;
+    }
+}
diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
index afc957ec17..c4c4c5f80b 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java
@@ -49,6 +49,11 @@ public class MockJniScanner extends JniScanner {
             this.j = j;
         }
 
+        @Override
+        public boolean isNull() {
+            return false;
+        }
+
         @Override
         public boolean getBoolean() {
             return (i + j) % 2 == 0;
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
index da76b9cf33..8d190aa212 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java
@@ -27,38 +27,40 @@ import java.util.List;
  * Column value in vector column
  */
 public interface ColumnValue {
-    public boolean getBoolean();
+    boolean isNull();
+
+    boolean getBoolean();
 
     // tinyint
-    public byte getByte();
+    byte getByte();
 
     // smallint
-    public short getShort();
+    short getShort();
 
-    public int getInt();
+    int getInt();
 
-    public float getFloat();
+    float getFloat();
 
     // bigint
-    public long getLong();
+    long getLong();
 
-    public double getDouble();
+    double getDouble();
 
-    public BigInteger getBigInteger();
+    BigInteger getBigInteger();
 
-    public BigDecimal getDecimal();
+    BigDecimal getDecimal();
 
-    public String getString();
+    String getString();
 
-    public LocalDate getDate();
+    LocalDate getDate();
 
-    public LocalDateTime getDateTime();
+    LocalDateTime getDateTime();
 
-    public byte[] getBytes();
+    byte[] getBytes();
 
-    public void unpackArray(List<ColumnValue> values);
+    void unpackArray(List<ColumnValue> values);
 
-    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values);
+    void unpackMap(List<ColumnValue> keys, List<ColumnValue> values);
 
-    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values);
+    void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values);
 }
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java
new file mode 100644
index 0000000000..0945f1f326
--- /dev/null
+++ 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.jni.vec;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * MaxCompute Column value in vector column
+ */
+public class MaxComputeColumnValue implements ColumnValue {
+    private static final Logger LOG = 
Logger.getLogger(MaxComputeColumnValue.class);
+    private int idx;
+    private FieldVector column;
+
+    public MaxComputeColumnValue() {
+        idx = 0;
+    }
+
+    public void reset(FieldVector column) {
+        this.column = column;
+        this.idx = 0;
+    }
+
+    @Override
+    public boolean isNull() {
+        return column.isNull(idx);
+    }
+
+    private void skippedIfNull() {
+        // null has been process by appendValue with isNull()
+        try {
+            if (column.isNull(idx)) {
+                idx++;
+            }
+        } catch (IndexOutOfBoundsException e) {
+            // skip left rows
+            idx++;
+        }
+    }
+
+    @Override
+    public boolean getBoolean() {
+        skippedIfNull();
+        TinyIntVector tinyIntCol = (TinyIntVector) column;
+        return tinyIntCol.get(idx++) > 0;
+    }
+
+    @Override
+    public byte getByte() {
+        skippedIfNull();
+        TinyIntVector tinyIntCol = (TinyIntVector) column;
+        return tinyIntCol.get(idx++);
+    }
+
+    @Override
+    public short getShort() {
+        skippedIfNull();
+        SmallIntVector smallIntCol = (SmallIntVector) column;
+        return smallIntCol.get(idx++);
+    }
+
+    @Override
+    public int getInt() {
+        skippedIfNull();
+        IntVector intCol = (IntVector) column;
+        return intCol.get(idx++);
+    }
+
+    @Override
+    public float getFloat() {
+        skippedIfNull();
+        Float4Vector floatCol = (Float4Vector) column;
+        return floatCol.get(idx++);
+    }
+
+    @Override
+    public long getLong() {
+        skippedIfNull();
+        BigIntVector longCol = (BigIntVector) column;
+        return longCol.get(idx++);
+    }
+
+    @Override
+    public double getDouble() {
+        skippedIfNull();
+        Float8Vector doubleCol = (Float8Vector) column;
+        return doubleCol.get(idx++);
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        skippedIfNull();
+        BigIntVector longCol = (BigIntVector) column;
+        return BigInteger.valueOf(longCol.get(idx++));
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        skippedIfNull();
+        DecimalVector decimalCol = (DecimalVector) column;
+        return 
DecimalUtility.getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++,
+                    decimalCol.getScale(), DecimalVector.TYPE_WIDTH);
+    }
+
+    @Override
+    public String getString() {
+        skippedIfNull();
+        VarCharVector varcharCol = (VarCharVector) column;
+        String v = varcharCol.getObject(idx++).toString();
+        return v == null ? new String(new byte[0]) : v;
+    }
+
+    @Override
+    public LocalDate getDate() {
+        skippedIfNull();
+        DateDayVector dateCol = (DateDayVector) column;
+        Integer intVal = dateCol.getObject(idx++);
+        return LocalDate.ofEpochDay(intVal == null ? 0 : intVal);
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        skippedIfNull();
+        DateMilliVector datetimeCol = (DateMilliVector) column;
+        LocalDateTime v = datetimeCol.getObject(idx++);
+        return v == null ? LocalDateTime.MIN : v;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        skippedIfNull();
+        VarBinaryVector binaryCol = (VarBinaryVector) column;
+        byte[] v = binaryCol.getObject(idx++);
+        return v == null ? new byte[0] : v;
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
+
+    }
+}
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
index a44de9062a..e02107f143 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java
@@ -122,6 +122,11 @@ public class ScanPredicate {
             return inspectObject().toString();
         }
 
+        @Override
+        public boolean isNull() {
+            return false;
+        }
+
         @Override
         public boolean getBoolean() {
             return (boolean) inspectObject();
diff --git 
a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java 
b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
index 8ce684ea95..4996191776 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java
@@ -551,7 +551,7 @@ public class VectorColumn {
 
     public void appendValue(ColumnValue o) {
         ColumnType.Type typeValue = columnType.getType();
-        if (o == null) {
+        if (o == null || o.isNull()) {
             appendNull(typeValue);
             return;
         }
diff --git a/fe/pom.xml b/fe/pom.xml
index 9124594427..e2da4103d3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -269,6 +269,7 @@ under the License.
         <!-- Please modify iceberg.version and avro.version together,
          you can find avro version info in iceberg mvn repository -->
         <iceberg.version>1.1.0</iceberg.version>
+        <maxcompute.version>0.43.3-public</maxcompute.version>
         <avro.version>1.11.1</avro.version>
         <!-- hudi -->
         <hudi.version>0.13.0</hudi.version>
@@ -1010,6 +1011,12 @@ under the License.
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-sql_2.12</artifactId>
                 <version>${spark.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.arrow</groupId>
+                        <artifactId>arrow-vector</artifactId>
+                    </exclusion>
+                </exclusions>
                 <scope>provided</scope>
             </dependency>
 
@@ -1102,7 +1109,17 @@ under the License.
                 <artifactId>iceberg-aws</artifactId>
                 <version>${iceberg.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>com.aliyun.odps</groupId>
+                <artifactId>odps-sdk-core</artifactId>
+                <version>${maxcompute.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.arrow</groupId>
+                        <artifactId>arrow-vector</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <!-- For Iceberg, must be consistent with Iceberg version -->
             <dependency>
                 <groupId>org.apache.avro</groupId>
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 631132f934..b848b847a4 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -307,12 +307,16 @@ struct TJdbcTable {
   6: optional string jdbc_resource_name
   7: optional string jdbc_driver_class
   8: optional string jdbc_driver_checksum
+  
 }
 
 struct TMCTable {
-  1: optional string tunnel_url
+  1: optional string region
   2: optional string project
   3: optional string table
+  4: optional string access_key
+  5: optional string secret_key
+  6: optional string public_access
 }
 
 // "Union" of all table types.
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 484b34c3d4..9c98cd4d28 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -114,6 +114,7 @@ enum TFileFormatType {
     FORMAT_ORC,
     FORMAT_JSON,
     FORMAT_PROTO,
+    FORMAT_JNI,
 }
 
 // In previous versions, the data compression format and file format were 
stored together, as TFileFormatType,
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index ee62f59aa5..5806483cba 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -645,6 +645,7 @@ enum TFileType {
     FILE_STREAM,    // file content is streaming in the buffer
     FILE_S3,
     FILE_HDFS,
+    FILE_NET,       // read file by network, such as http
 }
 
 struct TTabletCommitInfo {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to