This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit f4f86b07d842a242edd1cf4abe1ede8019c89584 Author: yuxuan-luo <119841515+yuxuan-...@users.noreply.github.com> AuthorDate: Tue Jun 6 15:08:30 2023 +0800 [Feature](multi-catalog)support paimon catalog (#19681) CREATE CATALOG paimon_n2 PROPERTIES ( "dfs.ha.namenodes.HDFS1006531" = "nn2,nn1", "dfs.namenode.rpc-address.HDFS1006531.nn2" = "172.16.65.xx:4007", "dfs.namenode.rpc-address.HDFS1006531.nn1" = "172.16.65.xx:4007", "hive.metastore.uris" = "thrift://172.16.65.xx:7004", "type" = "paimon", "dfs.nameservices" = "HDFS1006531", "hadoop.username" = "hadoop", "paimon.catalog.type" = "hms", "warehouse" = "hdfs://HDFS1006531/data/paimon1", "dfs.client.failover.proxy.provider.HDFS1006531" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" ); --- be/src/vec/CMakeLists.txt | 1 + be/src/vec/exec/scan/paimon_reader.cpp | 84 ++++++++++ be/src/vec/exec/scan/paimon_reader.h | 76 +++++++++ be/src/vec/exec/scan/vfile_scanner.cpp | 8 + fe/fe-core/pom.xml | 19 +++ .../java/org/apache/doris/catalog/TableIf.java | 4 +- .../catalog/external/PaimonExternalDatabase.java | 72 ++++++++ .../catalog/external/PaimonExternalTable.java | 132 +++++++++++++++ .../apache/doris/datasource/CatalogFactory.java | 4 + .../apache/doris/datasource/ExternalCatalog.java | 3 + .../apache/doris/datasource/InitCatalogLog.java | 1 + .../apache/doris/datasource/InitDatabaseLog.java | 1 + .../datasource/paimon/PaimonExternalCatalog.java | 103 ++++++++++++ .../paimon/PaimonHMSExternalCatalog.java | 105 ++++++++++++ .../property/constants/PaimonProperties.java} | 18 +- .../glue/translator/PhysicalPlanTranslator.java | 4 + .../org/apache/doris/persist/gson/GsonUtils.java | 8 + .../apache/doris/planner/SingleNodePlanner.java | 4 + .../doris/planner/external/FileQueryScanNode.java | 4 + .../doris/planner/external/TableFormatType.java | 3 +- .../planner/external/paimon/PaimonScanNode.java | 177 ++++++++++++++++++++ .../planner/external/paimon/PaimonSource.java | 64 +++++++ .../doris/planner/external/paimon/PaimonSplit.java | 65 +++++++ .../org/apache/doris/statistics/DeriveFactory.java | 1 + .../apache/doris/statistics/StatisticalType.java | 1 + fe/java-udf/pom.xml | 21 ++- .../org/apache/doris/jni/PaimonJniScanner.java | 186 +++++++++++++++++++++ .../apache/doris/jni/vec/PaimonColumnValue.java | 131 +++++++++++++++ fe/pom.xml | 2 +- gensrc/thrift/PlanNodes.thrift | 24 ++- 30 files changed, 1299 insertions(+), 27 deletions(-) diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 2bf3f245a1..bec6a747b5 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -352,6 +352,7 @@ set(VEC_FILES exec/format/parquet/bool_rle_decoder.cpp exec/jni_connector.cpp exec/scan/jni_reader.cpp + exec/scan/paimon_reader.cpp exec/scan/max_compute_jni_reader.cpp ) diff --git a/be/src/vec/exec/scan/paimon_reader.cpp b/be/src/vec/exec/scan/paimon_reader.cpp new file mode 100644 index 0000000000..906973d838 --- /dev/null +++ b/be/src/vec/exec/scan/paimon_reader.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "paimon_reader.h" + +#include <map> +#include <ostream> + +#include "runtime/descriptors.h" +#include "runtime/types.h" +#include "vec/core/types.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized +} // namespace doris + +namespace doris::vectorized { + +PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, + RuntimeState* state, RuntimeProfile* profile, + const TFileRangeDesc& range) + : _file_slot_descs(file_slot_descs), _state(state), _profile(profile) { + std::vector<std::string> column_names; + for (auto& desc : _file_slot_descs) { + std::string field = desc->col_name(); + column_names.emplace_back(field); + } + std::map<String, String> params; + params["required_fields"] = range.table_format_params.paimon_params.paimon_column_names; + params["columns_types"] = range.table_format_params.paimon_params.paimon_column_types; + params["columns_id"] = range.table_format_params.paimon_params.paimon_column_ids; + params["hive.metastore.uris"] = range.table_format_params.paimon_params.hive_metastore_uris; + params["warehouse"] = range.table_format_params.paimon_params.warehouse; + params["db_name"] = range.table_format_params.paimon_params.db_name; + params["table_name"] = range.table_format_params.paimon_params.table_name; + params["length_byte"] = range.table_format_params.paimon_params.length_byte; + params["split_byte"] = + std::to_string((int64_t)range.table_format_params.paimon_params.paimon_split.data()); + _jni_connector = std::make_unique<JniConnector>("org/apache/doris/jni/PaimonJniScanner", params, + column_names); +} + +Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof)); + if (*eof) { + RETURN_IF_ERROR(_jni_connector->close()); + } + return Status::OK(); +} + +Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + for (auto& desc : _file_slot_descs) { + name_to_type->emplace(desc->col_name(), desc->type()); + } + return Status::OK(); +} + +Status PaimonJniReader::init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + _colname_to_value_range = colname_to_value_range; + RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); + return _jni_connector->open(_state, _profile); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/paimon_reader.h b/be/src/vec/exec/scan/paimon_reader.h new file mode 100644 index 0000000000..be90d6d849 --- /dev/null +++ b/be/src/vec/exec/scan/paimon_reader.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> + +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "common/status.h" +#include "exec/olap_common.h" +#include "vec/exec/format/generic_reader.h" +#include "vec/exec/jni_connector.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; +class SlotDescriptor; +namespace vectorized { +class Block; +} // namespace vectorized +struct TypeDescriptor; +} // namespace doris + +namespace doris::vectorized { + +/** + * The demo usage of JniReader, showing how to read data from java scanner. + * The java side is also a mock reader that provide values for each type. + * This class will only be retained during the functional testing phase to verify that + * the communication and data exchange with the jvm are correct. + */ +class PaimonJniReader : public GenericReader { + ENABLE_FACTORY_CREATOR(PaimonJniReader); + +public: + PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, + RuntimeProfile* profile, const TFileRangeDesc& range); + + ~PaimonJniReader() override = default; + + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; + + Status init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + +private: + const std::vector<SlotDescriptor*>& _file_slot_descs; + RuntimeState* _state; + RuntimeProfile* _profile; + std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; + std::unique_ptr<JniConnector> _jni_connector; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index b7f8119553..a539abc9f0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -62,6 +62,7 @@ #include "vec/exec/format/table/iceberg_reader.h" #include "vec/exec/scan/max_compute_jni_reader.h" #include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/paimon_reader.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" @@ -600,6 +601,13 @@ Status VFileScanner::_get_next_reader() { init_status = mc_reader->init_reader(_colname_to_value_range); _cur_reader = std::move(mc_reader); } + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "paimon") { + _cur_reader = + PaimonJniReader::create_unique(_file_slot_descs, _state, _profile, range); + init_status = ((PaimonJniReader*)(_cur_reader.get())) + ->init_reader(_colname_to_value_range); + } break; } case TFileFormatType::FORMAT_PARQUET: { diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 4bce0193b4..db8e0b7ec1 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -34,6 +34,7 @@ under the License. <fe_ut_parallel>1</fe_ut_parallel> <antlr4.version>4.9.3</antlr4.version> <awssdk.version>2.17.257</awssdk.version> + <paimon.version>0.4-SNAPSHOT</paimon.version> </properties> <profiles> <profile> @@ -529,6 +530,24 @@ under the License. <artifactId>iceberg-aws</artifactId> </dependency> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-core</artifactId> + <version>${paimon.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-common</artifactId> + <version>${paimon.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-format</artifactId> + <version>${paimon.version}</version> + </dependency> + <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>glue</artifactId> diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index c79acc79df..95f8873c60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -134,7 +134,8 @@ public interface TableIf { enum TableType { MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, @Deprecated HUDI, JDBC, TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE, - ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, HUDI_EXTERNAL_TABLE; + ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, + HUDI_EXTERNAL_TABLE; public String toEngineName() { switch (this) { @@ -198,6 +199,7 @@ public interface TableIf { case HMS_EXTERNAL_TABLE: case ES_EXTERNAL_TABLE: case ICEBERG_EXTERNAL_TABLE: + case PAIMON_EXTERNAL_TABLE: return "EXTERNAL TABLE"; default: return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java new file mode 100644 index 0000000000..ac6d6932c6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.external; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitDatabaseLog; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.persist.gson.GsonPostProcessable; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable> implements GsonPostProcessable { + + private static final Logger LOG = LogManager.getLogger(PaimonExternalDatabase.class); + + public PaimonExternalDatabase(ExternalCatalog extCatalog, Long id, String name) { + super(extCatalog, id, name, InitDatabaseLog.Type.PAIMON); + } + + @Override + protected PaimonExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + return new PaimonExternalTable(tblId, tableName, name, (PaimonExternalCatalog) extCatalog); + } + + @Override + public List<PaimonExternalTable> getTablesOnIdOrder() { + // Sort the name instead, because the id may change. + return getTables().stream().sorted(Comparator.comparing(TableIf::getName)).collect(Collectors.toList()); + } + + @Override + public void dropTable(String tableName) { + LOG.debug("drop table [{}]", tableName); + makeSureInitialized(); + Long tableId = tableNameToId.remove(tableName); + if (tableId == null) { + LOG.warn("drop table [{}] failed", tableName); + } + idToTbl.remove(tableId); + } + + @Override + public void createTable(String tableName, long tableId) { + LOG.debug("create table [{}]", tableName); + makeSureInitialized(); + tableNameToId.put(tableName, tableId); + PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name, + (PaimonExternalCatalog) extCatalog); + idToTbl.put(tableId, table); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java new file mode 100644 index 0000000000..c821160dd4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.external; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.thrift.THiveTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; + +import java.util.HashMap; +import java.util.List; + +public class PaimonExternalTable extends ExternalTable { + + private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); + + public static final int PAIMON_DATETIME_SCALE_MS = 3; + private Table originTable = null; + + public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { + super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); + } + + public String getPaimonCatalogType() { + return ((PaimonExternalCatalog) catalog).getPaimonCatalogType(); + } + + protected synchronized void makeSureInitialized() { + super.makeSureInitialized(); + if (!objectCreated) { + objectCreated = true; + } + } + + public Table getOriginTable() { + if (originTable == null) { + originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); + } + return originTable; + } + + @Override + public List<Column> initSchema() { + Table table = getOriginTable(); + TableSchema schema = ((AbstractFileStoreTable) table).schema(); + List<DataField> columns = schema.fields(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (DataField field : columns) { + tmpSchema.add(new Column(field.name(), + paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id())); + } + return tmpSchema; + } + + private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return Type.BOOLEAN; + case INTEGER: + return Type.INT; + case BIGINT: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case VARCHAR: + case BINARY: + case CHAR: + return Type.STRING; + case DECIMAL: + DecimalType decimal = (DecimalType) dataType; + return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); + case DATE: + return ScalarType.createDateV2Type(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return ScalarType.createDatetimeV2Type(PAIMON_DATETIME_SCALE_MS); + case TIME_WITHOUT_TIME_ZONE: + return Type.UNSUPPORTED; + default: + throw new IllegalArgumentException("Cannot transform unknown type: " + dataType.getTypeRoot()); + } + } + + protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { + return paimonPrimitiveTypeToDorisType(type); + } + + @Override + public TTableDescriptor toThrift() { + List<Column> schema = getFullSchema(); + if (getPaimonCatalogType().equals("hms")) { + THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0, + getName(), dbName); + tTableDescriptor.setHiveTable(tHiveTable); + return tTableDescriptor; + } else { + throw new IllegalArgumentException("Currently only supports hms catalog,not support :" + + getPaimonCatalogType()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index f530bcc5f8..358b53a274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Resource; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory; +import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog; import org.apache.doris.datasource.test.TestExternalCatalog; import com.google.common.base.Strings; @@ -122,6 +123,9 @@ public class CatalogFactory { case "iceberg": catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, comment); break; + case "paimon": + catalog = new PaimonHMSExternalCatalog(catalogId, name, resource, props, comment); + break; case "max_compute": catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index a23842c9bd..cf2de86494 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.external.HMSExternalDatabase; import org.apache.doris.catalog.external.IcebergExternalDatabase; import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.catalog.external.MaxComputeExternalDatabase; +import org.apache.doris.catalog.external.PaimonExternalDatabase; import org.apache.doris.catalog.external.TestExternalDatabase; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; @@ -448,6 +449,8 @@ public abstract class ExternalCatalog //return new HudiExternalDatabase(this, dbId, dbName); case TEST: return new TestExternalDatabase(this, dbId, dbName); + case PAIMON: + return new PaimonExternalDatabase(this, dbId, dbName); default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java index ecc284b325..73fbeeb781 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java @@ -37,6 +37,7 @@ public class InitCatalogLog implements Writable { ES, JDBC, ICEBERG, + PAIMON, MAX_COMPUTE, HUDI, TEST, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java index a49dd5232c..14cd4410ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java @@ -39,6 +39,7 @@ public class InitDatabaseLog implements Writable { JDBC, MAX_COMPUTE, HUDI, + PAIMON, TEST, UNKNOWN; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java new file mode 100644 index 0000000000..3c024fadfb --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.SessionContext; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class PaimonExternalCatalog extends ExternalCatalog { + + private static final Logger LOG = LogManager.getLogger(PaimonExternalCatalog.class); + public static final String PAIMON_HMS = "hms"; + protected String paimonCatalogType; + protected Catalog catalog; + + public PaimonExternalCatalog(long catalogId, String name, String comment) { + super(catalogId, name, InitCatalogLog.Type.PAIMON, comment); + this.type = "paimon"; + } + + @Override + protected void init() { + super.init(); + } + + protected Configuration getConfiguration() { + Configuration conf = new HdfsConfiguration(); + Map<String, String> catalogProperties = catalogProperty.getHadoopProperties(); + for (Map.Entry<String, String> entry : catalogProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + public Catalog getCatalog() { + makeSureInitialized(); + return catalog; + } + + public String getPaimonCatalogType() { + makeSureInitialized(); + return paimonCatalogType; + } + + protected List<String> listDatabaseNames() { + return new ArrayList<>(catalog.listDatabases()); + } + + @Override + public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); + return catalog.tableExists(Identifier.create(dbName, tblName)); + } + + @Override + public List<String> listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); + List<String> tableNames = null; + try { + tableNames = catalog.listTables(dbName); + } catch (Catalog.DatabaseNotExistException e) { + LOG.warn("DatabaseNotExistException", e); + } + return tableNames; + } + + public org.apache.paimon.table.Table getPaimonTable(String dbName, String tblName) { + makeSureInitialized(); + org.apache.paimon.table.Table table = null; + try { + table = catalog.getTable(Identifier.create(dbName, tblName)); + } catch (Catalog.TableNotExistException e) { + LOG.warn("TableNotExistException", e); + } + return table; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java new file mode 100644 index 0000000000..13775b0edf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonHMSExternalCatalog.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.datasource.property.constants.PaimonProperties; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hive.HiveCatalogOptions; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + + + + +public class PaimonHMSExternalCatalog extends PaimonExternalCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(PaimonHMSExternalCatalog.class); + public static final String METASTORE = "metastore"; + public static final String METASTORE_HIVE = "hive"; + public static final String URI = "uri"; + private static final ConfigOption<String> METASTORE_CLIENT_CLASS = + ConfigOptions.key("metastore.client.class") + .stringType() + .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") + .withDescription( + "Class name of Hive metastore client.\n" + + "NOTE: This class must directly implements " + + "org.apache.hadoop.hive.metastore.IMetaStoreClient."); + + public PaimonHMSExternalCatalog(long catalogId, String name, String resource, + Map<String, String> props, String comment) { + super(catalogId, name, comment); + props = PropertyConverter.convertToMetaProperties(props); + catalogProperty = new CatalogProperty(resource, props); + paimonCatalogType = PAIMON_HMS; + } + + @Override + protected void initLocalObjectsImpl() { + String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); + String warehouse = catalogProperty.getOrDefault(PaimonProperties.WAREHOUSE, ""); + Options options = new Options(); + options.set(PaimonProperties.WAREHOUSE, warehouse); + // Currently, only supports hive + options.set(METASTORE, METASTORE_HIVE); + options.set(URI, metastoreUris); + CatalogContext context = CatalogContext.create(options, getConfiguration()); + try { + catalog = create(context); + } catch (IOException e) { + LOG.warn("failed to create paimon external catalog ", e); + throw new RuntimeException(e); + } + } + + private Catalog create(CatalogContext context) throws IOException { + Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE)); + FileIO fileIO; + fileIO = FileIO.get(warehousePath, context); + String uri = context.options().get(CatalogOptions.URI); + String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR); + String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir); + + // always using user-set parameters overwrite hive-site.xml parameters + context.options().toMap().forEach(hiveConf::set); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + // set the warehouse location to the hiveConf + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE)); + + String clientClassName = context.options().get(METASTORE_CLIENT_CLASS); + + return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java similarity index 70% copy from fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java index 6fc5d69544..e372dd5788 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java @@ -15,20 +15,8 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.planner.external; +package org.apache.doris.datasource.property.constants; -public enum TableFormatType { - HIVE("hive"), - ICEBERG("iceberg"), - HUDI("hudi"); - - private final String tableFormatType; - - TableFormatType(String tableFormatType) { - this.tableFormatType = tableFormatType; - } - - public String value() { - return tableFormatType; - } +public class PaimonProperties { + public static final String WAREHOUSE = "warehouse"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5d2b3c8f36..03f95dff5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -47,6 +47,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.catalog.external.IcebergExternalTable; +import org.apache.doris.catalog.external.PaimonExternalTable; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; @@ -157,6 +158,7 @@ import org.apache.doris.planner.UnionNode; import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.HudiScanNode; import org.apache.doris.planner.external.iceberg.IcebergScanNode; +import org.apache.doris.planner.external.paimon.PaimonScanNode; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TFetchOption; @@ -717,6 +719,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } } else if (table instanceof IcebergExternalTable) { scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + } else if (table instanceof PaimonExternalTable) { + scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false); } Preconditions.checkNotNull(scanNode); fileScan.getConjuncts().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index bf7c0a5484..413a96eac2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -51,6 +51,8 @@ import org.apache.doris.catalog.external.JdbcExternalDatabase; import org.apache.doris.catalog.external.JdbcExternalTable; import org.apache.doris.catalog.external.MaxComputeExternalDatabase; import org.apache.doris.catalog.external.MaxComputeExternalTable; +import org.apache.doris.catalog.external.PaimonExternalDatabase; +import org.apache.doris.catalog.external.PaimonExternalTable; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.EsExternalCatalog; @@ -63,6 +65,8 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import org.apache.doris.load.routineload.AbstractDataSourceProperties; @@ -194,6 +198,8 @@ public class GsonUtils { .registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) + .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName()) + .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName()) .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()); // routine load data source private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory = @@ -208,6 +214,7 @@ public class GsonUtils { .registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName()) .registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName()) .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName()) + .registerSubtype(PaimonExternalDatabase.class, PaimonExternalDatabase.class.getSimpleName()) .registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName()); private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of( @@ -216,6 +223,7 @@ public class GsonUtils { .registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName()) .registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName()) .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName()) + .registerSubtype(PaimonExternalTable.class, PaimonExternalTable.class.getSimpleName()) .registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName()); // runtime adapter for class "HeartbeatResponse" diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 0ae4a35edb..97de7d82ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -73,6 +73,7 @@ import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.HudiScanNode; import org.apache.doris.planner.external.MaxComputeScanNode; import org.apache.doris.planner.external.iceberg.IcebergScanNode; +import org.apache.doris.planner.external.paimon.PaimonScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.statistics.StatisticalType; @@ -2019,6 +2020,9 @@ public class SingleNodePlanner { case ICEBERG_EXTERNAL_TABLE: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; + case PAIMON_EXTERNAL_TABLE: + scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + break; case MAX_COMPUTE_EXTERNAL_TABLE: // TODO: support max compute scan node scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode", diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 508b174fb5..4c794d083f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -38,6 +38,8 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.iceberg.IcebergScanNode; import org.apache.doris.planner.external.iceberg.IcebergSplit; +import org.apache.doris.planner.external.paimon.PaimonScanNode; +import org.apache.doris.planner.external.paimon.PaimonSplit; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; @@ -279,6 +281,8 @@ public abstract class FileQueryScanNode extends FileScanNode { if (fileSplit instanceof IcebergSplit) { // TODO: extract all data lake split to factory IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) fileSplit); + } else if (fileSplit instanceof PaimonSplit) { + PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit); } // if (fileSplit instanceof HudiSplit) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java index 6fc5d69544..f97c8ea1ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java @@ -20,7 +20,8 @@ package org.apache.doris.planner.external; public enum TableFormatType { HIVE("hive"), ICEBERG("iceberg"), - HUDI("hudi"); + HUDI("hudi"), + PAIMON("paimon"); private final String tableFormatType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java new file mode 100644 index 0000000000..13da29b2bd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.paimon; + +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.catalog.external.PaimonExternalTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3Util; +import org.apache.doris.datasource.property.constants.PaimonProperties; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.external.FileQueryScanNode; +import org.apache.doris.planner.external.TableFormatType; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TFileAttributes; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPaimonFileDesc; +import org.apache.doris.thrift.TTableFormatFileDesc; + +import avro.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.paimon.hive.mapred.PaimonInputSplit; +import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataField; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class PaimonScanNode extends FileQueryScanNode { + private static PaimonSource source = null; + + public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); + } + + @Override + protected void doInitialize() throws UserException { + ExternalTable table = (ExternalTable) desc.getTable(); + if (table.isView()) { + throw new AnalysisException( + String.format("Querying external view '%s.%s' is not supported", table.getDbName(), table.getName())); + } + computeColumnFilter(); + initBackendPolicy(); + source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange); + Preconditions.checkNotNull(source); + initSchemaParams(); + } + + public static void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value()); + TPaimonFileDesc fileDesc = new TPaimonFileDesc(); + fileDesc.setPaimonSplit(paimonSplit.getSerializableSplit()); + fileDesc.setLengthByte(Integer.toString(paimonSplit.getSerializableSplit().length)); + //Paimon columnNames,columnTypes,columnIds that need to be transported into JNI + StringBuilder columnNamesBuilder = new StringBuilder(); + StringBuilder columnTypesBuilder = new StringBuilder(); + StringBuilder columnIdsBuilder = new StringBuilder(); + Map<String, Integer> paimonFieldsId = new HashMap<>(); + Map<String, String> paimonFieldsName = new HashMap<>(); + for (DataField field : ((AbstractFileStoreTable) source.getPaimonTable()).schema().fields()) { + paimonFieldsId.put(field.name(), field.id()); + paimonFieldsName.put(field.name(), field.type().toString()); + } + boolean isFirst = true; + for (SlotDescriptor slot : source.getDesc().getSlots()) { + if (!isFirst) { + columnNamesBuilder.append(","); + columnTypesBuilder.append(","); + columnIdsBuilder.append(","); + } + columnNamesBuilder.append(slot.getColumn().getName()); + columnTypesBuilder.append(paimonFieldsName.get(slot.getColumn().getName())); + columnIdsBuilder.append(paimonFieldsId.get(slot.getColumn().getName())); + isFirst = false; + } + fileDesc.setPaimonColumnIds(columnIdsBuilder.toString()); + fileDesc.setPaimonColumnNames(columnNamesBuilder.toString()); + fileDesc.setPaimonColumnTypes(columnTypesBuilder.toString()); + fileDesc.setHiveMetastoreUris(source.getCatalog().getCatalogProperty().getProperties() + .get(HiveConf.ConfVars.METASTOREURIS.varname)); + fileDesc.setWarehouse(source.getCatalog().getCatalogProperty().getProperties() + .get(PaimonProperties.WAREHOUSE)); + fileDesc.setDbName(((PaimonExternalTable) source.getTargetTable()).getDbName()); + fileDesc.setTableName(source.getTargetTable().getName()); + tableFormatFileDesc.setPaimonParams(fileDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + @Override + public List<Split> getSplits() throws UserException { + List<Split> splits = new ArrayList<>(); + ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); + List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.newScan().plan().splits(); + for (org.apache.paimon.table.source.Split split : paimonSplits) { + PaimonInputSplit inputSplit = new PaimonInputSplit( + "tempDir", + (DataSplit) split + ); + PaimonSplit paimonSplit = new PaimonSplit(inputSplit, + ((AbstractFileStoreTable) source.getPaimonTable()).location().toString()); + paimonSplit.setTableFormatType(TableFormatType.PAIMON); + splits.add(paimonSplit); + } + return splits; + } + + @Override + public TFileType getLocationType() throws DdlException, MetaNotFoundException { + String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString(); + if (location != null && !location.isEmpty()) { + if (S3Util.isObjStorage(location)) { + return TFileType.FILE_S3; + } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { + return TFileType.FILE_HDFS; + } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { + return TFileType.FILE_LOCAL; + } + } + throw new DdlException("Unknown file location " + location + + " for hms table " + source.getPaimonTable().name()); + } + + @Override + public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException { + return TFileFormatType.FORMAT_JNI; + } + + @Override + public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException { + return new ArrayList<>(source.getPaimonTable().partitionKeys()); + } + + @Override + public TFileAttributes getFileAttributes() throws UserException { + return source.getFileAttributes(); + } + + @Override + public TableIf getTargetTable() { + return source.getTargetTable(); + } + + @Override + public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException { + return source.getCatalog().getProperties(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java new file mode 100644 index 0000000000..2f55e30c08 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.paimon; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.PaimonExternalTable; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.thrift.TFileAttributes; + +import org.apache.paimon.table.Table; + +import java.util.Map; + +public class PaimonSource { + private final PaimonExternalTable paimonExtTable; + private final Table originTable; + + private final TupleDescriptor desc; + + public PaimonSource(PaimonExternalTable table, TupleDescriptor desc, + Map<String, ColumnRange> columnNameToRange) { + this.paimonExtTable = table; + this.originTable = paimonExtTable.getOriginTable(); + this.desc = desc; + } + + public TupleDescriptor getDesc() { + return desc; + } + + public Table getPaimonTable() { + return originTable; + } + + public TableIf getTargetTable() { + return paimonExtTable; + } + + public TFileAttributes getFileAttributes() throws UserException { + return new TFileAttributes(); + } + + public ExternalCatalog getCatalog() { + return paimonExtTable.getCatalog(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java new file mode 100644 index 0000000000..e36740f0fd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.paimon; + +import org.apache.doris.planner.external.FileSplit; +import org.apache.doris.planner.external.TableFormatType; + +import org.apache.hadoop.fs.Path; +import org.apache.paimon.hive.mapred.PaimonInputSplit; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class PaimonSplit extends FileSplit { + private PaimonInputSplit split; + private TableFormatType tableFormatType; + + public PaimonSplit(PaimonInputSplit split, String path) { + super(new Path(path), 0, 0, 0, null, null); + this.split = split; + } + + public PaimonInputSplit getSplit() { + return split; + } + + public void setSplit(PaimonInputSplit split) { + this.split = split; + } + + public TableFormatType getTableFormatType() { + return tableFormatType; + } + + public void setTableFormatType(TableFormatType tableFormatType) { + this.tableFormatType = tableFormatType; + } + + public byte[] getSerializableSplit() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(baos); + try { + split.write(output); + } catch (IOException e) { + e.printStackTrace(); + } + return baos.toByteArray(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java index 35391191c7..ba22e067a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java @@ -51,6 +51,7 @@ public class DeriveFactory { case ES_SCAN_NODE: case HIVE_SCAN_NODE: case ICEBERG_SCAN_NODE: + case PAIMON_SCAN_NODE: case INTERSECT_NODE: case SCHEMA_SCAN_NODE: case STREAM_LOAD_SCAN_NODE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 3a4a283c79..67dd9bb054 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -31,6 +31,7 @@ public enum StatisticalType { HASH_JOIN_NODE, HIVE_SCAN_NODE, ICEBERG_SCAN_NODE, + PAIMON_SCAN_NODE, HUDI_SCAN_NODE, TVF_SCAN_NODE, INTERSECT_NODE, diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index 66d0123795..bcebf3871b 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -37,6 +37,7 @@ under the License. <presto.hadoop.version>2.7.4-11</presto.hadoop.version> <presto.hive.version>3.0.0-8</presto.hive.version> <hudi.version>0.12.2</hudi.version> + <paimon.version>0.4-SNAPSHOT</paimon.version> </properties> <dependencies> @@ -50,6 +51,21 @@ under the License. <artifactId>fe-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-bundle</artifactId> + <version>${paimon.version}</version> + </dependency> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-hive-connector-2.3</artifactId> + <version>${paimon.version}</version> + </dependency> + <dependency> + <artifactId>hive-common</artifactId> + <groupId>org.apache.hive</groupId> + <version>2.3.9</version> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> @@ -162,10 +178,7 @@ under the License. </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.doris</groupId> - <artifactId>hive-catalog-shade</artifactId> - </dependency> + </dependencies> <build> <finalName>java-udf</finalName> diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java new file mode 100644 index 0000000000..03c8b6564e --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/PaimonJniScanner.java @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jni; + +import org.apache.doris.jni.utils.OffHeap; +import org.apache.doris.jni.vec.ColumnType; +import org.apache.doris.jni.vec.PaimonColumnValue; +import org.apache.doris.jni.vec.ScanPredicate; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.log4j.Logger; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.columnar.ColumnarRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hive.HiveCatalogOptions; +import org.apache.paimon.hive.mapred.PaimonInputSplit; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Map; + + +public class PaimonJniScanner extends JniScanner { + private static final Logger LOG = Logger.getLogger(PaimonJniScanner.class); + + private final String metastoreUris; + private final String warehouse; + private final String dbName; + private final String tblName; + private final String[] ids; + private final long splitAddress; + private final int lengthByte; + private PaimonInputSplit paimonInputSplit; + private Table table; + private RecordReader<InternalRow> reader; + private final PaimonColumnValue columnValue = new PaimonColumnValue(); + + public PaimonJniScanner(int batchSize, Map<String, String> params) { + metastoreUris = params.get("hive.metastore.uris"); + warehouse = params.get("warehouse"); + splitAddress = Long.parseLong(params.get("split_byte")); + lengthByte = Integer.parseInt(params.get("length_byte")); + LOG.info("splitAddress:" + splitAddress); + LOG.info("lengthByte:" + lengthByte); + dbName = params.get("db_name"); + tblName = params.get("table_name"); + String[] requiredFields = params.get("required_fields").split(","); + String[] types = params.get("columns_types").split(","); + ids = params.get("columns_id").split(","); + ColumnType[] columnTypes = new ColumnType[types.length]; + for (int i = 0; i < types.length; i++) { + columnTypes[i] = ColumnType.parseType(requiredFields[i], types[i]); + } + ScanPredicate[] predicates = new ScanPredicate[0]; + if (params.containsKey("push_down_predicates")) { + long predicatesAddress = Long.parseLong(params.get("push_down_predicates")); + if (predicatesAddress != 0) { + predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); + LOG.info("MockJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); + } + } + initTableInfo(columnTypes, requiredFields, predicates, batchSize); + } + + @Override + public void open() throws IOException { + getCatalog(); + // deserialize it into split + byte[] splitByte = new byte[lengthByte]; + OffHeap.copyMemory(null, splitAddress, splitByte, OffHeap.BYTE_ARRAY_OFFSET, lengthByte); + ByteArrayInputStream bais = new ByteArrayInputStream(splitByte); + DataInputStream input = new DataInputStream(bais); + try { + paimonInputSplit.readFields(input); + } catch (IOException e) { + e.printStackTrace(); + } + ReadBuilder readBuilder = table.newReadBuilder(); + TableRead read = readBuilder.newRead(); + reader = read.createReader(paimonInputSplit.split()); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + protected int getNext() throws IOException { + int rows = 0; + try { + RecordReader.RecordIterator batch; + while ((batch = reader.readBatch()) != null) { + Object record; + while ((record = batch.next()) != null) { + columnValue.setOffsetRow((ColumnarRow) record); + for (int i = 0; i < ids.length; i++) { + columnValue.setIdx(Integer.parseInt(ids[i])); + appendData(i, columnValue); + } + rows++; + } + batch.releaseBatch(); + } + } catch (IOException e) { + LOG.warn("failed to getNext columnValue ", e); + throw new RuntimeException(e); + } + return rows; + } + + private Catalog create(CatalogContext context) throws IOException { + Path warehousePath = new Path(context.options().get(CatalogOptions.WAREHOUSE)); + FileIO fileIO; + fileIO = FileIO.get(warehousePath, context); + String uri = context.options().get(CatalogOptions.URI); + String hiveConfDir = context.options().get(HiveCatalogOptions.HIVE_CONF_DIR); + String hadoopConfDir = context.options().get(HiveCatalogOptions.HADOOP_CONF_DIR); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir); + + // always using user-set parameters overwrite hive-site.xml parameters + context.options().toMap().forEach(hiveConf::set); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + // set the warehouse location to the hiveConf + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, context.options().get(CatalogOptions.WAREHOUSE)); + + String clientClassName = context.options().get(METASTORE_CLIENT_CLASS); + + return new HiveCatalog(fileIO, hiveConf, clientClassName, context.options().toMap()); + } + + private void getCatalog() { + paimonInputSplit = new PaimonInputSplit(); + Options options = new Options(); + options.set("warehouse", warehouse); + // Currently, only supports hive + options.set("metastore", "hive"); + options.set("uri", metastoreUris); + CatalogContext context = CatalogContext.create(options); + try { + Catalog catalog = create(context); + table = catalog.getTable(Identifier.create(dbName, tblName)); + } catch (IOException | Catalog.TableNotExistException e) { + LOG.warn("failed to create paimon external catalog ", e); + throw new RuntimeException(e); + } + } + + private static final ConfigOption<String> METASTORE_CLIENT_CLASS = + ConfigOptions.key("metastore.client.class") + .stringType() + .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") + .withDescription( + "Class name of Hive metastore client.\n" + + "NOTE: This class must directly implements " + + "org.apache.hadoop.hive.metastore.IMetaStoreClient."); +} diff --git a/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java new file mode 100644 index 0000000000..3d8bb5e42e --- /dev/null +++ b/fe/java-udf/src/main/java/org/apache/doris/jni/vec/PaimonColumnValue.java @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jni.vec; + +import org.apache.paimon.data.columnar.ColumnarRow; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; + +public class PaimonColumnValue implements ColumnValue { + private int idx; + private ColumnarRow record; + + public PaimonColumnValue() { + } + + public void setIdx(int idx) { + this.idx = idx; + } + + public void setOffsetRow(ColumnarRow record) { + this.record = record; + } + + @Override + public boolean getBoolean() { + return record.getBoolean(idx); + } + + @Override + public byte getByte() { + return record.getByte(idx); + } + + @Override + public short getShort() { + return record.getShort(idx); + } + + @Override + public int getInt() { + return record.getInt(idx); + } + + @Override + public float getFloat() { + return record.getFloat(idx); + } + + @Override + public long getLong() { + return record.getLong(idx); + } + + @Override + public double getDouble() { + return record.getDouble(idx); + } + + @Override + public BigInteger getBigInteger() { + return BigInteger.valueOf(record.getInt(idx)); + } + + @Override + public BigDecimal getDecimal() { + return BigDecimal.valueOf(getDouble()); + } + + @Override + public String getString() { + return record.getString(idx).toString(); + } + + @Override + public LocalDate getDate() { + return Instant.ofEpochMilli(record.getTimestamp(idx, 3) + .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDate(); + } + + @Override + public LocalDateTime getDateTime() { + return Instant.ofEpochMilli(record.getTimestamp(idx, 3) + .getMillisecond()).atZone(ZoneOffset.ofHours(8)).toLocalDateTime(); + } + + @Override + public boolean isNull() { + return true; + } + + @Override + public byte[] getBytes() { + return record.getBinary(idx); + } + + @Override + public void unpackArray(List<ColumnValue> values) { + + } + + @Override + public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) { + + } + + @Override + public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) { + + } +} diff --git a/fe/pom.xml b/fe/pom.xml index e2da4103d3..5cb5f36482 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -195,7 +195,7 @@ under the License. <doris.home>${fe.dir}/../</doris.home> <revision>1.2-SNAPSHOT</revision> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <doris.hive.catalog.shade.version>1.0.3-SNAPSHOT</doris.hive.catalog.shade.version> + <doris.hive.catalog.shade.version>1.0.4-SNAPSHOT</doris.hive.catalog.shade.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <!--plugin parameters--> diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 9c98cd4d28..d68007e69a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -186,8 +186,8 @@ struct TBrokerScanRangeParams { 1: required i8 column_separator; 2: required i8 line_delimiter; - // We construct one line in file to a tuple. And each field of line - // correspond to a slot in this tuple. + // We construct one line in file to a tuple. And each field of line + // correspond to a slot in this tuple. // src_tuple_id is the tuple id of the input file 3: required Types.TTupleId src_tuple_id // src_slot_ids is the slot_ids of the input file @@ -288,6 +288,19 @@ struct TIcebergFileDesc { 5: optional Exprs.TExpr file_select_conjunct; } +struct TPaimonFileDesc { + 1: optional binary paimon_split + 2: optional string paimon_column_ids + 3: optional string paimon_column_types + 4: optional string paimon_column_names + 5: optional string hive_metastore_uris + 6: optional string warehouse + 7: optional string db_name + 8: optional string table_name + 9: optional string length_byte +} + + struct THudiFileDesc { 1: optional string basePath; 2: optional string dataFilePath; @@ -300,6 +313,7 @@ struct TTableFormatFileDesc { 1: optional string table_format_type 2: optional TIcebergFileDesc iceberg_params 3: optional THudiFileDesc hudi_params + 4: optional TPaimonFileDesc paimon_params } struct TFileScanRangeParams { @@ -566,7 +580,7 @@ struct TSortInfo { 4: optional list<Exprs.TExpr> sort_tuple_slot_exprs // Indicates the nullable info of sort_tuple_slot_exprs is changed after substitute by child's smap - 5: optional list<bool> slot_exprs_nullability_changed_flags + 5: optional list<bool> slot_exprs_nullability_changed_flags // Indicates whether topn query using two phase read 6: optional bool use_two_phase_read } @@ -606,7 +620,7 @@ struct TEqJoinCondition { // right-hand side of "<a> = <b>" 2: required Exprs.TExpr right; // operator of equal join - 3: optional Opcodes.TExprOpcode opcode; + 3: optional Opcodes.TExprOpcode opcode; } enum TJoinOp { @@ -709,7 +723,7 @@ enum TAggregationOp { DENSE_RANK, ROW_NUMBER, LAG, - HLL_C, + HLL_C, BITMAP_UNION, NTILE, } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org