This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit 23e760a0a9036ad3335dfc08278f409eeb71873e Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Mon Jun 5 22:10:08 2023 +0800 [feature-wip](multi-catalog)(step2)support read max compute data by JNI (#19819) Issue Number: #19679 --- be/src/runtime/descriptors.cpp | 23 +- be/src/runtime/descriptors.h | 23 ++ be/src/vec/CMakeLists.txt | 1 + be/src/vec/exec/jni_connector.cpp | 47 ++-- be/src/vec/exec/jni_connector.h | 1 - be/src/vec/exec/scan/max_compute_jni_reader.cpp | 100 ++++++++ be/src/vec/exec/scan/max_compute_jni_reader.h | 80 +++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 14 ++ fe/fe-core/pom.xml | 1 - .../catalog/external/MaxComputeExternalTable.java | 25 +- .../apache/doris/datasource/ExternalCatalog.java | 15 +- .../datasource/MaxComputeExternalCatalog.java | 61 ++++- .../property/constants/MCProperties.java | 1 + .../doris/planner/external/FileQueryScanNode.java | 11 +- .../doris/planner/external/MaxComputeScanNode.java | 48 +++- fe/java-udf/pom.xml | 39 +++- .../org/apache/doris/hudi/HudiColumnValue.java | 5 + .../main/java/org/apache/doris/jni/JniScanner.java | 5 +- .../org/apache/doris/jni/MaxComputeJniScanner.java | 251 +++++++++++++++++++++ .../java/org/apache/doris/jni/MockJniScanner.java | 5 + .../java/org/apache/doris/jni/vec/ColumnValue.java | 34 +-- .../doris/jni/vec/MaxComputeColumnValue.java | 185 +++++++++++++++ .../org/apache/doris/jni/vec/ScanPredicate.java | 5 + .../org/apache/doris/jni/vec/VectorColumn.java | 2 +- fe/pom.xml | 19 +- gensrc/thrift/Descriptors.thrift | 6 +- gensrc/thrift/PlanNodes.thrift | 1 + gensrc/thrift/Types.thrift | 1 + 28 files changed, 931 insertions(+), 78 deletions(-) diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index f4cb3f6dff..a75d3d0e71 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -117,7 +117,8 @@ std::string SlotDescriptor::debug_string() const { } TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc) - : _name(tdesc.tableName), + : _table_type(tdesc.tableType), + _name(tdesc.tableName), _database(tdesc.dbName), _table_id(tdesc.id), _num_cols(tdesc.numCols), @@ -179,6 +180,23 @@ std::string IcebergTableDescriptor::debug_string() const { return out.str(); } +MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc), + _region(tdesc.mcTable.region), + _project(tdesc.mcTable.project), + _table(tdesc.mcTable.table), + _access_key(tdesc.mcTable.access_key), + _secret_key(tdesc.mcTable.secret_key), + _public_access(tdesc.mcTable.public_access) {} + +MaxComputeTableDescriptor::~MaxComputeTableDescriptor() {} + +std::string MaxComputeTableDescriptor::debug_string() const { + std::stringstream out; + out << "MaxComputeTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {} EsTableDescriptor::~EsTableDescriptor() {} @@ -573,6 +591,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::JDBC_TABLE: desc = pool->add(new JdbcTableDescriptor(tdesc)); break; + case TTableType::MAX_COMPUTE_TABLE: + desc = pool->add(new MaxComputeTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 48ea79d879..cdb20f0fb2 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -162,11 +162,13 @@ public: return slot_desc->col_pos() < _num_clustering_cols; } + ::doris::TTableType::type table_type() const { return _table_type; } const std::string& name() const { return _name; } const std::string& database() const { return _database; } int32_t table_id() const { return _table_id; } private: + ::doris::TTableType::type _table_type; std::string _name; std::string _database; int32_t _table_id; @@ -218,6 +220,27 @@ public: private: }; +class MaxComputeTableDescriptor : public TableDescriptor { +public: + MaxComputeTableDescriptor(const TTableDescriptor& tdesc); + ~MaxComputeTableDescriptor() override; + std::string debug_string() const override; + const std::string region() const { return _region; } + const std::string project() const { return _project; } + const std::string table() const { return _table; } + const std::string access_key() const { return _access_key; } + const std::string secret_key() const { return _secret_key; } + const std::string public_access() const { return _public_access; } + +private: + std::string _region; + std::string _project; + std::string _table; + std::string _access_key; + std::string _secret_key; + std::string _public_access; +}; + class EsTableDescriptor : public TableDescriptor { public: EsTableDescriptor(const TTableDescriptor& tdesc); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 725c4c88bc..2bf3f245a1 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -352,6 +352,7 @@ set(VEC_FILES exec/format/parquet/bool_rle_decoder.cpp exec/jni_connector.cpp exec/scan/jni_reader.cpp + exec/scan/max_compute_jni_reader.cpp ) if (WITH_MYSQL) diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index fcddbad134..fd5cf52e2b 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -63,14 +63,17 @@ JniConnector::~JniConnector() { } Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { - RETURN_IF_ERROR(JniUtil::GetJNIEnv(&_env)); - if (_env == nullptr) { + // cannot put the env into fields, because frames in an env object is limited + // to avoid limited frames in a thread, we should get local env in a method instead of in whole object. + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + if (env == nullptr) { return Status::InternalError("Failed to get/create JVM"); } - RETURN_IF_ERROR(_init_jni_scanner(_env, state->batch_size())); + RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size())); // Call org.apache.doris.jni.JniScanner#open - _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); - RETURN_ERROR_IF_EXC(_env); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); + RETURN_ERROR_IF_EXC(env); return Status::OK(); } @@ -87,12 +90,12 @@ Status JniConnector::init( } Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) { - JniLocalFrame jni_frame; - RETURN_IF_ERROR(jni_frame.push(_env)); // Call org.apache.doris.jni.JniScanner#getNextBatchMeta // return the address of meta information - long meta_address = _env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch); - RETURN_ERROR_IF_EXC(_env); + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + long meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch); + RETURN_ERROR_IF_EXC(env); if (meta_address == 0) { // Address == 0 when there's no data in scanner *read_rows = 0; @@ -109,25 +112,27 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_fill_block(block, num_rows)); *read_rows = num_rows; *eof = false; - _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); - RETURN_ERROR_IF_EXC(_env); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); + RETURN_ERROR_IF_EXC(env); _has_read += num_rows; return Status::OK(); } Status JniConnector::close() { if (!_closed) { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); // _fill_block may be failed and returned, we should release table in close. // org.apache.doris.jni.JniScanner#releaseTable is idempotent - _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); - _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close); - _env->DeleteLocalRef(_jni_scanner_obj); - _env->DeleteLocalRef(_jni_scanner_cls); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close); + env->DeleteGlobalRef(_jni_scanner_obj); + env->DeleteGlobalRef(_jni_scanner_cls); _closed = true; - jthrowable exc = (_env)->ExceptionOccurred(); + jthrowable exc = (env)->ExceptionOccurred(); if (exc != nullptr) { LOG(FATAL) << "Failed to release jni resource: " - << JniUtil::GetJniExceptionMsg(_env).to_string(); + << JniUtil::GetJniExceptionMsg(env).to_string(); } } return Status::OK(); @@ -170,7 +175,7 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) { RETURN_ERROR_IF_EXC(env); _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V"); RETURN_ERROR_IF_EXC(env); - + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _jni_scanner_obj, &_jni_scanner_obj)); return Status::OK(); } @@ -180,9 +185,11 @@ Status JniConnector::_fill_block(Block* block, size_t num_rows) { auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; RETURN_IF_ERROR(_fill_column(column_ptr, column_type, num_rows)); + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); // Column is not released when _fill_column failed. It will be released when releasing table. - _env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i); - RETURN_ERROR_IF_EXC(_env); + env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i); + RETURN_ERROR_IF_EXC(env); } return Status::OK(); } diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 14915ac533..90d5fbcd0a 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -227,7 +227,6 @@ private: long* _meta_ptr; int _meta_index; - JNIEnv* _env = nullptr; int _predicates_length = 0; std::unique_ptr<char[]> _predicates = nullptr; diff --git a/be/src/vec/exec/scan/max_compute_jni_reader.cpp b/be/src/vec/exec/scan/max_compute_jni_reader.cpp new file mode 100644 index 0000000000..f5182931d1 --- /dev/null +++ b/be/src/vec/exec/scan/max_compute_jni_reader.cpp @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "max_compute_jni_reader.h" + +#include <glog/logging.h> + +#include <map> +#include <ostream> + +#include "runtime/descriptors.h" +#include "runtime/types.h" +#include "vec/core/types.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized +} // namespace doris + +namespace doris::vectorized { + +MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc, + const std::vector<SlotDescriptor*>& file_slot_descs, + const TFileRangeDesc& range, RuntimeState* state, + RuntimeProfile* profile) + : _file_slot_descs(file_slot_descs), _range(range), _state(state), _profile(profile) { + _table_desc = mc_desc; + std::ostringstream required_fields; + std::ostringstream columns_types; + std::vector<std::string> column_names; + int index = 0; + for (auto& desc : _file_slot_descs) { + std::string field = desc->col_name(); + std::string type = JniConnector::get_hive_type(desc->type()); + column_names.emplace_back(field); + if (index == 0) { + required_fields << field; + columns_types << type; + } else { + required_fields << "," << field; + columns_types << "#" << type; + } + index++; + } + std::map<String, String> params = {{"region", _table_desc->region()}, + {"access_key", _table_desc->access_key()}, + {"secret_key", _table_desc->secret_key()}, + {"project", _table_desc->project()}, + {"table", _table_desc->table()}, + {"public_access", _table_desc->public_access()}, + {"start_offset", std::to_string(_range.start_offset)}, + {"split_size", std::to_string(_range.size)}, + {"required_fields", required_fields.str()}, + {"columns_types", columns_types.str()}}; + _jni_connector = std::make_unique<JniConnector>("org/apache/doris/jni/MaxComputeJniScanner", + params, column_names); +} + +Status MaxComputeJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof)); + if (*eof) { + RETURN_IF_ERROR(_jni_connector->close()); + } + return Status::OK(); +} + +Status MaxComputeJniReader::get_columns( + std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + for (auto& desc : _file_slot_descs) { + name_to_type->emplace(desc->col_name(), desc->type()); + } + return Status::OK(); +} + +Status MaxComputeJniReader::init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + _colname_to_value_range = colname_to_value_range; + RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); + return _jni_connector->open(_state, _profile); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/max_compute_jni_reader.h b/be/src/vec/exec/scan/max_compute_jni_reader.h new file mode 100644 index 0000000000..0b3c809c50 --- /dev/null +++ b/be/src/vec/exec/scan/max_compute_jni_reader.h @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> + +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "common/status.h" +#include "exec/olap_common.h" +#include "runtime/descriptors.h" +#include "vec/exec/format/generic_reader.h" +#include "vec/exec/jni_connector.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; +class SlotDescriptor; +namespace vectorized { +class Block; +} // namespace vectorized +struct TypeDescriptor; +} // namespace doris + +namespace doris::vectorized { + +/** + * The demo usage of JniReader, showing how to read data from java scanner. + * The java side is also a mock reader that provide values for each type. + * This class will only be retained during the functional testing phase to verify that + * the communication and data exchange with the jvm are correct. + */ +class MaxComputeJniReader : public GenericReader { + ENABLE_FACTORY_CREATOR(MaxComputeJniReader); + +public: + MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc, + const std::vector<SlotDescriptor*>& file_slot_descs, + const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile); + + ~MaxComputeJniReader() override = default; + + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; + + Status init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + +private: + const MaxComputeTableDescriptor* _table_desc; + const std::vector<SlotDescriptor*>& _file_slot_descs; + const TFileRangeDesc& _range; + RuntimeState* _state; + RuntimeProfile* _profile; + std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; + std::unique_ptr<JniConnector> _jni_connector; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index a1a15e61e2..b7f8119553 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -60,6 +60,7 @@ #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/iceberg_reader.h" +#include "vec/exec/scan/max_compute_jni_reader.h" #include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exprs/vexpr.h" @@ -588,6 +589,19 @@ Status VFileScanner::_get_next_reader() { Status init_status; // TODO: use data lake type switch (_params.format_type) { + case TFileFormatType::FORMAT_JNI: { + if (_real_tuple_desc->table_desc()->table_type() == + ::doris::TTableType::type::MAX_COMPUTE_TABLE) { + const MaxComputeTableDescriptor* mc_desc = + static_cast<const MaxComputeTableDescriptor*>( + _real_tuple_desc->table_desc()); + std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique( + mc_desc, _file_slot_descs, range, _state, _profile); + init_status = mc_reader->init_reader(_colname_to_value_range); + _cur_reader = std::move(mc_reader); + } + break; + } case TFileFormatType::FORMAT_PARQUET: { std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique( _profile, _params, range, _state->query_options().batch_size, diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 63f6b82143..4bce0193b4 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -410,7 +410,6 @@ under the License. <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> - <version>0.43.3-public</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java index 5de781a686..012693bccd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java @@ -32,10 +32,12 @@ import org.apache.doris.thrift.TTableType; import com.aliyun.odps.OdpsType; import com.aliyun.odps.Table; import com.aliyun.odps.type.ArrayTypeInfo; +import com.aliyun.odps.type.CharTypeInfo; import com.aliyun.odps.type.DecimalTypeInfo; import com.aliyun.odps.type.MapTypeInfo; import com.aliyun.odps.type.StructTypeInfo; import com.aliyun.odps.type.TypeInfo; +import com.aliyun.odps.type.VarcharTypeInfo; import com.google.common.collect.Lists; import java.util.ArrayList; @@ -95,13 +97,15 @@ public class MaxComputeExternalTable extends ExternalTable { return Type.BIGINT; } case CHAR: { - return Type.CHAR; - } - case VARCHAR: { - return Type.VARCHAR; + CharTypeInfo charType = (CharTypeInfo) typeInfo; + return ScalarType.createChar(charType.getLength()); } case STRING: { - return Type.STRING; + return ScalarType.createStringType(); + } + case VARCHAR: { + VarcharTypeInfo varcharType = (VarcharTypeInfo) typeInfo; + return ScalarType.createVarchar(varcharType.getLength()); } case JSON: { return Type.UNSUPPORTED; @@ -158,7 +162,11 @@ public class MaxComputeExternalTable extends ExternalTable { public TTableDescriptor toThrift() { List<Column> schema = getFullSchema(); TMCTable tMcTable = new TMCTable(); - tMcTable.setTunnelUrl(((MaxComputeExternalCatalog) catalog).getTunnelUrl()); + MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) catalog; + tMcTable.setRegion(mcCatalog.getRegion()); + tMcTable.setAccessKey(mcCatalog.getAccessKey()); + tMcTable.setSecretKey(mcCatalog.getSecretKey()); + tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess())); // use mc project as dbName tMcTable.setProject(dbName); tMcTable.setTable(name); @@ -168,9 +176,14 @@ public class MaxComputeExternalTable extends ExternalTable { return tTableDescriptor; } + public Table getOdpsTable() { + return odpsTable; + } + @Override public String getMysqlType() { return "BASE TABLE"; } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index ae070bf507..a23842c9bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -107,15 +107,6 @@ public abstract class ExternalCatalog // set some default properties when creating catalog } - /** - * @return names of database in this catalog. - */ - // public abstract List<String> listDatabaseNames(SessionContext ctx); - public List<String> listDatabaseNames(SessionContext ctx) { - makeSureInitialized(); - return new ArrayList<>(dbNameToId.keySet()); - } - /** * @param dbName * @return names of tables in specified database @@ -315,9 +306,13 @@ public abstract class ExternalCatalog this.comment = comment; } + /** + * @return names of database in this catalog. + */ @Override public List<String> getDbNames() { - return listDatabaseNames(null); + makeSureInitialized(); + return new ArrayList<>(dbNameToId.keySet()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java index d3f77a985f..c62abf9ada 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java @@ -24,7 +24,10 @@ import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; import com.google.common.base.Strings; +import com.google.gson.annotations.SerializedName; import java.util.ArrayList; import java.util.List; @@ -32,9 +35,16 @@ import java.util.Map; public class MaxComputeExternalCatalog extends ExternalCatalog { private Odps odps; - private String tunnelUrl; + @SerializedName(value = "region") + private String region; + @SerializedName(value = "accessKey") + private String accessKey; + @SerializedName(value = "secretKey") + private String secretKey; + @SerializedName(value = "publicAccess") + private boolean enablePublicAccess; private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api"; - private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun.com"; + private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com"; public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, String comment) { @@ -57,12 +67,30 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { // may use oss-cn-beijing, ensure compatible region = region.replace("oss-", ""); } - this.tunnelUrl = tunnelUrlTemplate.replace("{}", region); + this.region = region; CloudCredential credential = MCProperties.getCredential(props); - Account account = new AliyunAccount(credential.getAccessKey(), credential.getSecretKey()); + if (!credential.isWhole()) { + throw new IllegalArgumentException("Max-Compute credential properties '" + + MCProperties.ACCESS_KEY + "' and '" + MCProperties.SECRET_KEY + "' are required."); + } + accessKey = credential.getAccessKey(); + secretKey = credential.getSecretKey(); + Account account = new AliyunAccount(accessKey, secretKey); this.odps = new Odps(account); odps.setEndpoint(odpsUrlTemplate.replace("{}", region)); odps.setDefaultProject(defaultProject); + enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false")); + } + + public long getTotalRows(String project, String table) throws TunnelException { + makeSureInitialized(); + TableTunnel tunnel = new TableTunnel(odps); + String tunnelUrl = tunnelUrlTemplate.replace("{}", region); + if (enablePublicAccess) { + tunnelUrl = tunnelUrlTemplate.replace("-inc", ""); + } + tunnel.setEndpoint(tunnelUrl); + return tunnel.createDownloadSession(project, table).getRecordCount(); } public Odps getClient() { @@ -73,6 +101,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { protected List<String> listDatabaseNames() { List<String> result = new ArrayList<>(); try { + // TODO: How to get all privileged project from max compute as databases? + // Now only have permission to show default project. result.add(odps.projects().get(odps.getDefaultProject()).getName()); } catch (OdpsException e) { throw new RuntimeException(e); @@ -99,11 +129,26 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { } /** - * data tunnel url - * @return tunnelUrl, required by jni scanner. + * use region to create data tunnel url + * @return region, required by jni scanner. */ - public String getTunnelUrl() { + public String getRegion() { + makeSureInitialized(); + return region; + } + + public String getAccessKey() { + makeSureInitialized(); + return accessKey; + } + + public String getSecretKey() { + makeSureInitialized(); + return secretKey; + } + + public boolean enablePublicAccess() { makeSureInitialized(); - return tunnelUrl; + return enablePublicAccess; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java index 0fb4274049..32c8534ace 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java @@ -30,6 +30,7 @@ public class MCProperties extends BaseProperties { public static final String ACCESS_KEY = "mc.access_key"; public static final String SECRET_KEY = "mc.secret_key"; public static final String SESSION_TOKEN = "mc.session_token"; + public static final String PUBLIC_ACCESS = "mc.public_access"; public static CloudCredential getCredential(Map<String, String> props) { return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index b07a48da2b..508b174fb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -194,7 +194,11 @@ public abstract class FileQueryScanNode extends FileScanNode { throws UserException { TableIf tbl = getTargetTable(); List<Integer> columnIdxs = Lists.newArrayList(); - + // avoid null pointer, it maybe has no slots when two tables are joined + if (params.getRequiredSlots() == null) { + params.setColumnIdxs(columnIdxs); + return; + } for (TFileScanSlotInfo slot : params.getRequiredSlots()) { if (!slot.isIsFileSlot()) { continue; @@ -273,6 +277,7 @@ public abstract class FileQueryScanNode extends FileScanNode { TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys); // external data lake table if (fileSplit instanceof IcebergSplit) { + // TODO: extract all data lake split to factory IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit); } @@ -329,7 +334,9 @@ public abstract class FileQueryScanNode extends FileScanNode { if (getLocationType() == TFileType.FILE_HDFS) { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) { + } else if (getLocationType() == TFileType.FILE_S3 + || getLocationType() == TFileType.FILE_BROKER + || getLocationType() == TFileType.FILE_NET) { // need full path rangeDesc.setPath(fileSplit.getPath().toString()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java index 367576ba6c..102292e4c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java @@ -20,13 +20,16 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.MaxComputeExternalTable; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.MaxComputeExternalCatalog; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import com.aliyun.odps.tunnel.TunnelException; import org.apache.hadoop.fs.Path; import java.util.ArrayList; @@ -38,22 +41,24 @@ import java.util.Map; public class MaxComputeScanNode extends FileQueryScanNode { private final MaxComputeExternalTable table; + private final MaxComputeExternalCatalog catalog; + public static final int MIN_SPLIT_SIZE = 4096; public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { super(id, desc, planNodeName, statisticalType, needCheckColumnPriv); table = (MaxComputeExternalTable) desc.getTable(); + catalog = (MaxComputeExternalCatalog) table.getCatalog(); } @Override protected TFileType getLocationType() throws UserException { - return TFileType.FILE_STREAM; + return TFileType.FILE_NET; } @Override public TFileFormatType getFileFormatType() { - // TODO: use max compute format - return TFileFormatType.FORMAT_PARQUET; + return TFileFormatType.FORMAT_JNI; } @Override @@ -74,7 +79,42 @@ public class MaxComputeScanNode extends FileQueryScanNode { @Override protected List<Split> getSplits() throws UserException { List<Split> result = new ArrayList<>(); - result.add(new FileSplit(new Path("/"), 0, -1, -1, 0L, new String[0], Collections.emptyList())); + // String splitPath = catalog.getTunnelUrl(); + // TODO: use single max compute scan node rather than file scan node + com.aliyun.odps.Table odpsTable = table.getOdpsTable(); + if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { + return result; + } + try { + List<Pair<Long, Long>> sliceRange = new ArrayList<>(); + long totalRows = catalog.getTotalRows(table.getDbName(), table.getName()); + long fileNum = odpsTable.getFileNum(); + long start = 0; + long splitSize = (long) Math.ceil((double) totalRows / fileNum); + if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) { + // use whole split + sliceRange.add(Pair.of(start, totalRows)); + } else { + for (int i = 0; i < fileNum; i++) { + if (start > totalRows) { + break; + } + sliceRange.add(Pair.of(start, splitSize)); + start += splitSize; + } + } + long modificationTime = odpsTable.getLastDataModifiedTime().getTime(); + if (!sliceRange.isEmpty()) { + for (int i = 0; i < sliceRange.size(); i++) { + Pair<Long, Long> range = sliceRange.get(i); + result.add(new FileSplit(new Path("/virtual_slice_" + i), range.first, range.second, + totalRows, modificationTime, null, Collections.emptyList())); + } + } + } catch (TunnelException e) { + throw new UserException("Max Compute tunnel SDK exception.", e); + + } return result; } } diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index 0242bc2370..66d0123795 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -50,10 +50,6 @@ under the License. <artifactId>fe-common</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.doris</groupId> - <artifactId>hive-catalog-shade</artifactId> - </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> @@ -81,6 +77,36 @@ under the License. <artifactId>clickhouse-jdbc</artifactId> <classifier>all</classifier> </dependency> + <dependency> + <groupId>com.aliyun.odps</groupId> + <artifactId>odps-sdk-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> + </exclusion> + <exclusion> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>9.0.0</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-unsafe</artifactId> + <version>9.0.0</version> + </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-presto-bundle</artifactId> @@ -136,7 +162,10 @@ under the License. </exclusion> </exclusions> </dependency> - + <dependency> + <groupId>org.apache.doris</groupId> + <artifactId>hive-catalog-shade</artifactId> + </dependency> </dependencies> <build> <finalName>java-udf</finalName> diff --git a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java index 3a5d4059b2..11fbd0558c 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java +++ b/fe/java-udf/src/main/java/org/apache/doris/hudi/HudiColumnValue.java @@ -41,6 +41,11 @@ public class HudiColumnValue implements ColumnValue { return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); } + @Override + public boolean isNull() { + return false; + } + @Override public boolean getBoolean() { return (boolean) inspectObject(); diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java index 3f1f1df462..3d0223ba75 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/JniScanner.java @@ -72,6 +72,7 @@ public abstract class JniScanner { throw e; } if (numRows == 0) { + releaseTable(); return 0; } return getMetaAddress(numRows); @@ -83,7 +84,9 @@ public abstract class JniScanner { } protected void resetTable() { - vectorTable.reset(); + if (vectorTable != null) { + vectorTable.reset(); + } } protected void releaseColumn(int fieldId) { diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java new file mode 100644 index 0000000000..65aa2ccded --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MaxComputeJniScanner.java @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jni; + +import org.apache.doris.jni.vec.ColumnType; +import org.apache.doris.jni.vec.MaxComputeColumnValue; +import org.apache.doris.jni.vec.ScanPredicate; + +import com.aliyun.odps.Column; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.data.ArrowRecordReader; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.type.TypeInfo; +import com.aliyun.odps.type.TypeInfoFactory; +import com.google.common.base.Strings; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * MaxComputeJ JniScanner. BE will read data from the scanner object. + */ +public class MaxComputeJniScanner extends JniScanner { + private Odps odps; + private TableTunnel tunnel; + + private static final Logger LOG = Logger.getLogger(MaxComputeJniScanner.class); + private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api"; + private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com"; + private static final String REGION = "region"; + private static final String PROJECT = "project"; + private static final String TABLE = "table"; + private static final String ACCESS_KEY = "access_key"; + private static final String SECRET_KEY = "secret_key"; + private static final String START_OFFSET = "start_offset"; + private static final String SPLIT_SIZE = "split_size"; + private static final String PUBLIC_ACCESS = "public_access"; + private final String project; + private final String table; + private MaxComputeColumnValue columnValue; + private long remainBatchRows = 0; + private long totalRows = 0; + private TableTunnel.DownloadSession session; + private ArrowRecordReader curReader; + private List<Column> columns; + private Map<String, Integer> readColumnsId; + private long startOffset = -1L; + private long splitSize = -1L; + + public MaxComputeJniScanner(int batchSize, Map<String, String> params) { + String region = Objects.requireNonNull(params.get(REGION), "required property '" + REGION + "'."); + project = Objects.requireNonNull(params.get(PROJECT), "required property '" + PROJECT + "'."); + table = Objects.requireNonNull(params.get(TABLE), "required property '" + TABLE + "'."); + if (!Strings.isNullOrEmpty(params.get(START_OFFSET)) + && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) { + startOffset = Long.parseLong(params.get(START_OFFSET)); + splitSize = Long.parseLong(params.get(SPLIT_SIZE)); + } + String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'."); + String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'."); + odps = new Odps(new AliyunAccount(accessKey, secretKey)); + odps.setEndpoint(odpsUrlTemplate.replace("{}", region)); + odps.setDefaultProject(project); + tunnel = new TableTunnel(odps); + String tunnelUrl = tunnelUrlTemplate.replace("{}", region); + boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false")); + if (enablePublicAccess) { + tunnelUrl = tunnelUrlTemplate.replace("-inc", ""); + } + tunnel.setEndpoint(tunnelUrl); + String[] requiredFields = params.get("required_fields").split(","); + String[] types = params.get("columns_types").split("#"); + ColumnType[] columnTypes = new ColumnType[types.length]; + for (int i = 0; i < types.length; i++) { + columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); + } + ScanPredicate[] predicates = new ScanPredicate[0]; + if (params.containsKey("push_down_predicates")) { + long predicatesAddress = Long.parseLong(params.get("push_down_predicates")); + if (predicatesAddress != 0) { + predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); + LOG.info("MaxComputeJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); + } + } + initTableInfo(columnTypes, requiredFields, predicates, batchSize); + } + + @Override + protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates, + int batchSize) { + super.initTableInfo(requiredTypes, requiredFields, predicates, batchSize); + columns = new ArrayList<>(); + readColumnsId = new HashMap<>(); + for (int i = 0; i < fields.length; i++) { + if (!Strings.isNullOrEmpty(fields[i])) { + columns.add(createOdpsColumn(i, types[i])); + readColumnsId.put(fields[i], i); + } + } + // reorder columns + List<Column> columnList = odps.tables().get(table).getSchema().getColumns(); + Map<String, Integer> columnRank = new HashMap<>(); + for (int i = 0; i < columnList.size(); i++) { + columnRank.put(columnList.get(i).getName(), i); + } + // Downloading columns data from Max compute only supports the order of table metadata. + // We might get an error message if no sort here: Column reorder is not supported in legacy arrow mode. + columns.sort((Comparator.comparing(o -> columnRank.get(o.getName())))); + } + + @Override + public void open() throws IOException { + if (columns.isEmpty()) { + return; + } + try { + session = tunnel.createDownloadSession(project, table); + if (splitSize > 0) { + totalRows = Math.min(splitSize, session.getRecordCount()); + } else { + totalRows = session.getRecordCount(); + } + long start = startOffset == -1L ? 0 : startOffset; + curReader = session.openArrowRecordReader(start, totalRows, columns); + } catch (Exception e) { + throw new IOException(e); + } + remainBatchRows = totalRows; + } + + private Column createOdpsColumn(int colIdx, ColumnType dorisType) { + TypeInfo odpsType; + switch (dorisType.getType()) { + case BOOLEAN: + odpsType = TypeInfoFactory.BOOLEAN; + break; + case TINYINT: + odpsType = TypeInfoFactory.TINYINT; + break; + case SMALLINT: + odpsType = TypeInfoFactory.SMALLINT; + break; + case INT: + odpsType = TypeInfoFactory.INT; + break; + case BIGINT: + odpsType = TypeInfoFactory.BIGINT; + break; + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + case DECIMALV2: + odpsType = TypeInfoFactory.getDecimalTypeInfo(dorisType.getPrecision(), dorisType.getScale()); + break; + case FLOAT: + odpsType = TypeInfoFactory.FLOAT; + break; + case DOUBLE: + odpsType = TypeInfoFactory.DOUBLE; + break; + case DATETIMEV2: + odpsType = TypeInfoFactory.DATETIME; + break; + case DATEV2: + odpsType = TypeInfoFactory.DATE; + break; + case CHAR: + odpsType = TypeInfoFactory.getCharTypeInfo(dorisType.getLength()); + break; + case VARCHAR: + odpsType = TypeInfoFactory.getVarcharTypeInfo(dorisType.getLength()); + break; + case STRING: + odpsType = TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING); + break; + default: + throw new RuntimeException("Unsupported transform for column type: " + dorisType.getType()); + } + return new Column(fields[colIdx], odpsType); + } + + @Override + public void close() throws IOException { + remainBatchRows = 0; + totalRows = 0; + startOffset = -1; + splitSize = -1; + if (curReader != null) { + curReader.close(); + } + } + + @Override + protected int getNext() throws IOException { + if (curReader == null) { + return 0; + } + columnValue = new MaxComputeColumnValue(); + int expectedRows = (int) Math.min(batchSize, remainBatchRows); + int realRows = readVectors(expectedRows); + if (remainBatchRows <= 0) { + return 0; + } + remainBatchRows -= realRows; + return realRows; + } + + private int readVectors(int expectedRows) throws IOException { + VectorSchemaRoot batch; + int curReadRows = 0; + while (curReadRows < expectedRows && (batch = curReader.read()) != null) { + List<FieldVector> fieldVectors = batch.getFieldVectors(); + int batchRows = 0; + for (FieldVector column : fieldVectors) { + columnValue.reset(column); + // LOG.warn("MCJNI read getClass: " + column.getClass()); + batchRows = column.getValueCount(); + for (int j = 0; j < batchRows; j++) { + appendData(readColumnsId.get(column.getName()), columnValue); + } + } + curReadRows += batchRows; + } + return curReadRows; + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java index afc957ec17..c4c4c5f80b 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/MockJniScanner.java @@ -49,6 +49,11 @@ public class MockJniScanner extends JniScanner { this.j = j; } + @Override + public boolean isNull() { + return false; + } + @Override public boolean getBoolean() { return (i + j) % 2 == 0; diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java index da76b9cf33..8d190aa212 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ColumnValue.java @@ -27,38 +27,40 @@ import java.util.List; * Column value in vector column */ public interface ColumnValue { - public boolean getBoolean(); + boolean isNull(); + + boolean getBoolean(); // tinyint - public byte getByte(); + byte getByte(); // smallint - public short getShort(); + short getShort(); - public int getInt(); + int getInt(); - public float getFloat(); + float getFloat(); // bigint - public long getLong(); + long getLong(); - public double getDouble(); + double getDouble(); - public BigInteger getBigInteger(); + BigInteger getBigInteger(); - public BigDecimal getDecimal(); + BigDecimal getDecimal(); - public String getString(); + String getString(); - public LocalDate getDate(); + LocalDate getDate(); - public LocalDateTime getDateTime(); + LocalDateTime getDateTime(); - public byte[] getBytes(); + byte[] getBytes(); - public void unpackArray(List<ColumnValue> values); + void unpackArray(List<ColumnValue> values); - public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values); + void unpackMap(List<ColumnValue> keys, List<ColumnValue> values); - public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values); + void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values); } diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java new file mode 100644 index 0000000000..0945f1f326 --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/MaxComputeColumnValue.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jni.vec; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.util.DecimalUtility; +import org.apache.log4j.Logger; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; + +/** + * MaxCompute Column value in vector column + */ +public class MaxComputeColumnValue implements ColumnValue { + private static final Logger LOG = Logger.getLogger(MaxComputeColumnValue.class); + private int idx; + private FieldVector column; + + public MaxComputeColumnValue() { + idx = 0; + } + + public void reset(FieldVector column) { + this.column = column; + this.idx = 0; + } + + @Override + public boolean isNull() { + return column.isNull(idx); + } + + private void skippedIfNull() { + // null has been process by appendValue with isNull() + try { + if (column.isNull(idx)) { + idx++; + } + } catch (IndexOutOfBoundsException e) { + // skip left rows + idx++; + } + } + + @Override + public boolean getBoolean() { + skippedIfNull(); + TinyIntVector tinyIntCol = (TinyIntVector) column; + return tinyIntCol.get(idx++) > 0; + } + + @Override + public byte getByte() { + skippedIfNull(); + TinyIntVector tinyIntCol = (TinyIntVector) column; + return tinyIntCol.get(idx++); + } + + @Override + public short getShort() { + skippedIfNull(); + SmallIntVector smallIntCol = (SmallIntVector) column; + return smallIntCol.get(idx++); + } + + @Override + public int getInt() { + skippedIfNull(); + IntVector intCol = (IntVector) column; + return intCol.get(idx++); + } + + @Override + public float getFloat() { + skippedIfNull(); + Float4Vector floatCol = (Float4Vector) column; + return floatCol.get(idx++); + } + + @Override + public long getLong() { + skippedIfNull(); + BigIntVector longCol = (BigIntVector) column; + return longCol.get(idx++); + } + + @Override + public double getDouble() { + skippedIfNull(); + Float8Vector doubleCol = (Float8Vector) column; + return doubleCol.get(idx++); + } + + @Override + public BigInteger getBigInteger() { + skippedIfNull(); + BigIntVector longCol = (BigIntVector) column; + return BigInteger.valueOf(longCol.get(idx++)); + } + + @Override + public BigDecimal getDecimal() { + skippedIfNull(); + DecimalVector decimalCol = (DecimalVector) column; + return DecimalUtility.getBigDecimalFromArrowBuf(column.getDataBuffer(), idx++, + decimalCol.getScale(), DecimalVector.TYPE_WIDTH); + } + + @Override + public String getString() { + skippedIfNull(); + VarCharVector varcharCol = (VarCharVector) column; + String v = varcharCol.getObject(idx++).toString(); + return v == null ? new String(new byte[0]) : v; + } + + @Override + public LocalDate getDate() { + skippedIfNull(); + DateDayVector dateCol = (DateDayVector) column; + Integer intVal = dateCol.getObject(idx++); + return LocalDate.ofEpochDay(intVal == null ? 0 : intVal); + } + + @Override + public LocalDateTime getDateTime() { + skippedIfNull(); + DateMilliVector datetimeCol = (DateMilliVector) column; + LocalDateTime v = datetimeCol.getObject(idx++); + return v == null ? LocalDateTime.MIN : v; + } + + @Override + public byte[] getBytes() { + skippedIfNull(); + VarBinaryVector binaryCol = (VarBinaryVector) column; + byte[] v = binaryCol.getObject(idx++); + return v == null ? new byte[0] : v; + } + + @Override + public void unpackArray(List<ColumnValue> values) { + + } + + @Override + public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) { + + } + + @Override + public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) { + + } +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java index a44de9062a..e02107f143 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/ScanPredicate.java @@ -122,6 +122,11 @@ public class ScanPredicate { return inspectObject().toString(); } + @Override + public boolean isNull() { + return false; + } + @Override public boolean getBoolean() { return (boolean) inspectObject(); diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java index 8ce684ea95..4996191776 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/VectorColumn.java @@ -551,7 +551,7 @@ public class VectorColumn { public void appendValue(ColumnValue o) { ColumnType.Type typeValue = columnType.getType(); - if (o == null) { + if (o == null || o.isNull()) { appendNull(typeValue); return; } diff --git a/fe/pom.xml b/fe/pom.xml index 9124594427..e2da4103d3 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -269,6 +269,7 @@ under the License. <!-- Please modify iceberg.version and avro.version together, you can find avro version info in iceberg mvn repository --> <iceberg.version>1.1.0</iceberg.version> + <maxcompute.version>0.43.3-public</maxcompute.version> <avro.version>1.11.1</avro.version> <!-- hudi --> <hudi.version>0.13.0</hudi.version> @@ -1010,6 +1011,12 @@ under the License. <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + </exclusion> + </exclusions> <scope>provided</scope> </dependency> @@ -1102,7 +1109,17 @@ under the License. <artifactId>iceberg-aws</artifactId> <version>${iceberg.version}</version> </dependency> - + <dependency> + <groupId>com.aliyun.odps</groupId> + <artifactId>odps-sdk-core</artifactId> + <version>${maxcompute.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- For Iceberg, must be consistent with Iceberg version --> <dependency> <groupId>org.apache.avro</groupId> diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 631132f934..b848b847a4 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -307,12 +307,16 @@ struct TJdbcTable { 6: optional string jdbc_resource_name 7: optional string jdbc_driver_class 8: optional string jdbc_driver_checksum + } struct TMCTable { - 1: optional string tunnel_url + 1: optional string region 2: optional string project 3: optional string table + 4: optional string access_key + 5: optional string secret_key + 6: optional string public_access } // "Union" of all table types. diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 484b34c3d4..9c98cd4d28 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -114,6 +114,7 @@ enum TFileFormatType { FORMAT_ORC, FORMAT_JSON, FORMAT_PROTO, + FORMAT_JNI, } // In previous versions, the data compression format and file format were stored together, as TFileFormatType, diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index ee62f59aa5..5806483cba 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -645,6 +645,7 @@ enum TFileType { FILE_STREAM, // file content is streaming in the buffer FILE_S3, FILE_HDFS, + FILE_NET, // read file by network, such as http } struct TTabletCommitInfo { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org