This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 637c46ca0356454082a3e288233e8244bc8114ad Author: Yulei-Yang <yulei.yang0...@gmail.com> AuthorDate: Sun Dec 25 21:57:18 2022 +0800 [Improvement](multi-catalog) support hive external tables which store data on tencent chdfs (#15297) * support read hive table whichs store data on tencent chdfs in multi-catalog --- .../docs/ecosystem/external-table/hive-of-doris.md | 1 + .../docs/ecosystem/external-table/multi-catalog.md | 1 + .../docs/ecosystem/external-table/hive-of-doris.md | 1 + .../docs/ecosystem/external-table/multi-catalog.md | 1 + .../java/org/apache/doris/catalog/BrokerMgr.java | 21 +++++++++++++++++ .../org/apache/doris/planner/BrokerScanNode.java | 9 ++++++-- .../doris/planner/external/HiveScanProvider.java | 2 ++ .../doris/planner/external/QueryScanProvider.java | 26 ++++++++++++++++++++-- 8 files changed, 58 insertions(+), 4 deletions(-) diff --git a/docs/en/docs/ecosystem/external-table/hive-of-doris.md b/docs/en/docs/ecosystem/external-table/hive-of-doris.md index ed7f4438d9..a68b6ce157 100644 --- a/docs/en/docs/ecosystem/external-table/hive-of-doris.md +++ b/docs/en/docs/ecosystem/external-table/hive-of-doris.md @@ -31,6 +31,7 @@ Hive External Table of Doris provides Doris with direct access to Hive external 1. support for Hive data sources to access Doris 2. Support joint queries between Doris and Hive data sources to perform more complex analysis operations 3. Support access to kerberos-enabled Hive data sources + 4. Support access to hive tables whose data stored on tencent chdfs This document introduces how to use this feature and the considerations. diff --git a/docs/en/docs/ecosystem/external-table/multi-catalog.md b/docs/en/docs/ecosystem/external-table/multi-catalog.md index 3be2f3bba0..2eff944c35 100644 --- a/docs/en/docs/ecosystem/external-table/multi-catalog.md +++ b/docs/en/docs/ecosystem/external-table/multi-catalog.md @@ -77,6 +77,7 @@ This function will be used as a supplement and enhancement to the previous exter > 1. hive supports version 2.3.7 and above. > 2. Iceberg currently only supports V1 version, V2 version will be supported > soon. > 3. Hudi currently only supports Snapshot Query for Copy On Write tables and > Read Optimized Query for Merge On Read tables. In the future, Incremental > Query and Snapshot Query for Merge On Read tables will be supported soon. +> 4. Support access to hive tables whose data stored on tencent chdfs, usage is same as common hive table. The following example is used to create a Catalog named hive to connect the specified Hive MetaStore, and provide the HDFS HA connection properties to access the corresponding files in HDFS. diff --git a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md index f348d97947..89e581a4a6 100644 --- a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md +++ b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md @@ -33,6 +33,7 @@ Hive External Table of Doris 提供了 Doris 直接访问 Hive 外部表的能 1. 支持 Hive 数据源接入Doris 2. 支持 Doris 与 Hive 数据源中的表联合查询,进行更加复杂的分析操作 3. 支持 访问开启 kerberos 的 Hive 数据源 +4. 支持 访问数据存储在腾讯 CHDFS 上的 Hive 数据源 本文档主要介绍该功能的使用方式和注意事项等。 diff --git a/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md b/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md index 3627bb9221..e7ad61f766 100644 --- a/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md +++ b/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md @@ -77,6 +77,7 @@ under the License. > 1. hive 支持 2.3.7 以上版本。 > 2. Iceberg 目前仅支持 V1 版本,V2 版本即将支持。 > 3. Hudi 目前仅支持 Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Read > Optimized Query。后续将支持 Incremental Query 和 Merge On Read 表的 Snapshot Query。 +> 4. 支持数据存储在腾讯 CHDFS 上的 hive 表,用法和普通 hive 一样。 以下示例,用于创建一个名为 hive 的 Catalog 连接指定的 Hive MetaStore,并提供了 HDFS HA 连接属性,用于访问对应的 HDFS 中的文件。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java index 84adeea719..28a679b9a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java @@ -38,6 +38,7 @@ import com.google.common.collect.Maps; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -128,6 +129,26 @@ public class BrokerMgr { } } + public FsBroker getAnyAliveBroker() { + lock.lock(); + try { + List<FsBroker> allBrokers = new ArrayList<>(); + for (List<FsBroker> list : brokerListMap.values()) { + allBrokers.addAll(list); + } + + Collections.shuffle(allBrokers); + for (FsBroker fsBroker : allBrokers) { + if (fsBroker.isAlive) { + return fsBroker; + } + } + } finally { + lock.unlock(); + } + return null; + } + public FsBroker getBroker(String brokerName, String host) throws AnalysisException { if (brokerName.equalsIgnoreCase(BrokerDesc.MULTI_LOAD_BROKER)) { return new FsBroker("127.0.0.1", 0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 41b0d1f770..38284b097e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -317,8 +317,7 @@ public class BrokerScanNode extends LoadScanNode { // Generate on broker scan range TBrokerScanRange brokerScanRange = new TBrokerScanRange(); brokerScanRange.setParams(params); - if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER - || brokerDesc.getStorageType() == StorageBackend.StorageType.OFS) { + if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) { FsBroker broker = null; try { broker = Env.getCurrentEnv().getBrokerMgr() @@ -327,6 +326,12 @@ public class BrokerScanNode extends LoadScanNode { throw new UserException(e.getMessage()); } brokerScanRange.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.OFS) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); + } + brokerScanRange.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); } else { brokerScanRange.setBrokerAddresses(new ArrayList<>()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index ace3693907..36bde10b7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -126,6 +126,8 @@ public class HiveScanProvider extends HMSTableScanProvider { return TFileType.FILE_HDFS; } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) { return TFileType.FILE_LOCAL; + } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) { + return TFileType.FILE_BROKER; } } throw new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index a7453394f6..b332819700 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -17,6 +17,8 @@ package org.apache.doris.planner.external; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -38,6 +40,7 @@ import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Joiner; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; @@ -75,7 +78,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf { // set hdfs params for hdfs file type. Map<String, String> locationProperties = getLocationProperties(); - if (locationType == TFileType.FILE_HDFS) { + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { String fsName = ""; if (this instanceof TVFScanProvider) { fsName = ((TVFScanProvider) this).getFsName(); @@ -90,6 +93,14 @@ public abstract class QueryScanProvider implements FileScanProviderIf { THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); tHdfsParams.setFsName(fsName); context.params.setHdfsParams(tHdfsParams); + + if (locationType == TFileType.FILE_BROKER) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); + } + context.params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port)); + } } else if (locationType == TFileType.FILE_S3) { context.params.setProperties(locationProperties); } @@ -108,6 +119,13 @@ public abstract class QueryScanProvider implements FileScanProviderIf { if (split instanceof IcebergSplit) { IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) split); } + + // file size of orc files is not correct get by FileSplit.getLength(), + // broker reader needs correct file size + if (locationType == TFileType.FILE_BROKER && fileFormatType == TFileFormatType.FORMAT_ORC) { + rangeDesc.setFileSize(((OrcSplit) fileSplit).getFileLength()); + } + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), @@ -172,12 +190,16 @@ public abstract class QueryScanProvider implements FileScanProviderIf { TFileRangeDesc rangeDesc = new TFileRangeDesc(); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); + // fileSize only be used when format is orc or parquet and TFileType is broker + // When TFileType is other type, it is not necessary + rangeDesc.setFileSize(fileSplit.getLength()); rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); if (getLocationType() == TFileType.FILE_HDFS) { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); - } else if (getLocationType() == TFileType.FILE_S3) { + } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() == TFileType.FILE_BROKER) { + // need full path rangeDesc.setPath(fileSplit.getPath().toString()); } return rangeDesc; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org