This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0b5b7175d67ec668103563272ab9ac89a613e2fa Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Thu Feb 29 15:30:29 2024 +0800 [fix](multi-catalog) add max compute custom odps and tunnel url (#31390) add max compute custom odps and tunnel url --- be/src/runtime/descriptors.cpp | 2 ++ be/src/runtime/descriptors.h | 4 +++ .../exec/format/table/max_compute_jni_reader.cpp | 2 ++ .../en/docs/lakehouse/multi-catalog/max-compute.md | 25 +++++++++++++ .../docs/lakehouse/multi-catalog/max-compute.md | 26 ++++++++++++++ .../doris/maxcompute/MaxComputeJniScanner.java | 5 ++- .../doris/maxcompute/MaxComputeTableScan.java | 37 ++++++++++++++----- .../maxcompute/MaxComputeExternalCatalog.java | 41 +++++++++++++++++----- .../maxcompute/MaxComputeExternalTable.java | 2 ++ .../property/constants/MCProperties.java | 2 ++ gensrc/thrift/Descriptors.thrift | 2 ++ 11 files changed, 131 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 0f6f4319189..e7056dd6001 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -197,6 +197,8 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde _region(tdesc.mcTable.region), _project(tdesc.mcTable.project), _table(tdesc.mcTable.table), + _odps_url(tdesc.mcTable.odps_url), + _tunnel_url(tdesc.mcTable.tunnel_url), _access_key(tdesc.mcTable.access_key), _secret_key(tdesc.mcTable.secret_key), _public_access(tdesc.mcTable.public_access) {} diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 4aa3d28e47d..fff1ed339d5 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -238,6 +238,8 @@ public: const std::string region() const { return _region; } const std::string project() const { return _project; } const std::string table() const { return _table; } + const std::string odps_url() const { return _odps_url; } + const std::string tunnel_url() const { return _tunnel_url; } const std::string access_key() const { return _access_key; } const std::string secret_key() const { return _secret_key; } const std::string public_access() const { return _public_access; } @@ -246,6 +248,8 @@ private: std::string _region; std::string _project; std::string _table; + std::string _odps_url; + std::string _tunnel_url; std::string _access_key; std::string _secret_key; std::string _public_access; diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp index 152b9571279..f7dd9c9846f 100644 --- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp +++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp @@ -66,6 +66,8 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des index++; } std::map<String, String> params = {{"region", _table_desc->region()}, + {"odps_url", _table_desc->odps_url()}, + {"tunnel_url", _table_desc->tunnel_url()}, {"access_key", _table_desc->access_key()}, {"secret_key", _table_desc->secret_key()}, {"project", _table_desc->project()}, diff --git a/docs/en/docs/lakehouse/multi-catalog/max-compute.md b/docs/en/docs/lakehouse/multi-catalog/max-compute.md index bc62d84358b..3facac2e9cb 100644 --- a/docs/en/docs/lakehouse/multi-catalog/max-compute.md +++ b/docs/en/docs/lakehouse/multi-catalog/max-compute.md @@ -61,4 +61,29 @@ Pay-as-you-go quota has limited concurrency and usage. For additional resources, Consistent with Hive Catalog, please refer to the **column type mapping** section in [Hive Catalog](./hive.md). +## User-defined service address + +The region property is specified to generate a default endpoint of public network. + +In addition to default endpoint addresses, Max Compute Catalog also supports custom service addresses in properties. + +Use the following properties: +* `mc.odps_endpoint`:Max Compute Endpoint。 +* `mc.tunnel_endpoint`: Tunnel Endpoint,Max Compute Catalog uses the Tunnel SDK to obtain data. + +For more information about Max Compute Endpoint and Tunnel Endpoint that are used in different regions and network connection modes, see [Endpoint](https://www.alibabacloud.com/help/en/maxcompute/user-guide/endpoints) + +For example: + +```sql +CREATE CATALOG mc PROPERTIES ( + "type" = "max_compute", + "mc.region" = "cn-beijing", + "mc.default.project" = "your-project", + "mc.access_key" = "ak", + "mc.secret_key" = "sk" + "mc.odps_endpoint" = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api", + "mc.tunnel_endpoint" = "http://dt.cn-beijing.maxcompute.aliyun-inc.com" +); +``` diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md b/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md index 2d65950d270..9ba66a2e9e7 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md @@ -61,4 +61,30 @@ CREATE CATALOG mc PROPERTIES ( 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 +## 自定义服务地址 + +默认情况下,Max Compute Catalog根据region去默认生成公网的endpoint。 + +除了默认的endpoint地址外,Max Compute Catalog也支持在属性中自定义服务地址。 + +使用以下两个属性: +* `mc.odps_endpoint`:Max Compute Endpoint。 +* `mc.tunnel_endpoint`: Tunnel Endpoint,Max Compute Catalog使用Tunnel SDK获取数据。 + +Max Compute Endpoint和Tunnel Endpoint的配置请参见[各地域及不同网络连接方式下的Endpoint](https://help.aliyun.com/zh/maxcompute/user-guide/endpoints) + +示例: + +```sql +CREATE CATALOG mc PROPERTIES ( + "type" = "max_compute", + "mc.region" = "cn-beijing", + "mc.default.project" = "your-project", + "mc.access_key" = "ak", + "mc.secret_key" = "sk" + "mc.odps_endpoint" = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api", + "mc.tunnel_endpoint" = "http://dt.cn-beijing.maxcompute.aliyun-inc.com" +); +``` + diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java index 2003286e1f7..a87446b14b7 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java @@ -58,6 +58,8 @@ public class MaxComputeJniScanner extends JniScanner { private static final String TABLE = "table"; private static final String ACCESS_KEY = "access_key"; private static final String SECRET_KEY = "secret_key"; + private static final String ODPS_URL = "odps_url"; + private static final String TUNNEL_URL = "tunnel_url"; private static final String START_OFFSET = "start_offset"; private static final String SPLIT_SIZE = "split_size"; private static final String PUBLIC_ACCESS = "public_access"; @@ -122,7 +124,8 @@ public class MaxComputeJniScanner extends JniScanner { String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'."); String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'."); boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false")); - return new MaxComputeTableScan(region, project, table, accessKey, secretKey, enablePublicAccess); + return new MaxComputeTableScan(params.get(ODPS_URL), params.get(TUNNEL_URL), region, project, table, + accessKey, secretKey, enablePublicAccess); } public String tableUniqKey() { diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java index c0fa40dae46..0de1cb17e79 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java @@ -23,6 +23,7 @@ import com.aliyun.odps.TableSchema; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TunnelException; +import org.apache.commons.lang3.StringUtils; import java.io.IOException; @@ -39,20 +40,40 @@ public class MaxComputeTableScan { private volatile long readRows = 0; private long totalRows = 0; - public MaxComputeTableScan(String region, String project, String table, + public MaxComputeTableScan(String odpsUrl, String tunnelUrl, String region, String project, String table, String accessKey, String secretKey, boolean enablePublicAccess) { this.project = project; this.table = table; odps = new Odps(new AliyunAccount(accessKey, secretKey)); - String odpsUrl = odpsUrlTemplate.replace("{}", region); - String tunnelUrl = tunnelUrlTemplate.replace("{}", region); - if (enablePublicAccess) { - odpsUrl = odpsUrl.replace("-inc", ""); - tunnelUrl = tunnelUrl.replace("-inc", ""); - } - odps.setEndpoint(odpsUrl); + setOdpsUrl(odpsUrl, region, enablePublicAccess); odps.setDefaultProject(this.project); tunnel = new TableTunnel(odps); + setTunnelUrl(tunnelUrl, region, enablePublicAccess); + } + + private void setOdpsUrl(String defaultOdpsUrl, String region, boolean enablePublicAccess) { + String odpsUrl; + if (StringUtils.isNotEmpty(defaultOdpsUrl)) { + odpsUrl = defaultOdpsUrl; + } else { + odpsUrl = odpsUrlTemplate.replace("{}", region); + if (enablePublicAccess) { + odpsUrl = odpsUrl.replace("-inc", ""); + } + } + odps.setEndpoint(odpsUrl); + } + + private void setTunnelUrl(String defaultTunnelUrl, String region, boolean enablePublicAccess) { + String tunnelUrl; + if (StringUtils.isNotEmpty(defaultTunnelUrl)) { + tunnelUrl = defaultTunnelUrl; + } else { + tunnelUrl = tunnelUrlTemplate.replace("{}", region); + if (enablePublicAccess) { + tunnelUrl = tunnelUrl.replace("-inc", ""); + } + } tunnel.setEndpoint(tunnelUrl); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java index d3539756dfc..6c875d0e29a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalCatalog.java @@ -35,6 +35,7 @@ import com.aliyun.odps.tunnel.TableTunnel; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Iterator; @@ -55,6 +56,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { private boolean enablePublicAccess; private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api"; private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com"; + private static String odpsUrl; + private static String tunnelUrl; private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of( MCProperties.REGION, MCProperties.PROJECT @@ -64,6 +67,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { String comment) { super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment); catalogProperty = new CatalogProperty(resource, props); + odpsUrl = props.getOrDefault(MCProperties.ODPS_ENDPOINT, ""); + tunnelUrl = props.getOrDefault(MCProperties.TUNNEL_SDK_ENDPOINT, ""); } @Override @@ -92,16 +97,28 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { Account account = new AliyunAccount(accessKey, secretKey); this.odps = new Odps(account); enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false")); - String odpsUrl = odpsUrlTemplate.replace("{}", region); - if (enablePublicAccess) { - odpsUrl = odpsUrl.replace("-inc", ""); - } - odps.setEndpoint(odpsUrl); + setOdpsUrl(region); odps.setDefaultProject(defaultProject); tunnel = new TableTunnel(odps); - String tunnelUrl = tunnelUrlTemplate.replace("{}", region); - if (enablePublicAccess) { - tunnelUrl = tunnelUrl.replace("-inc", ""); + setTunnelUrl(region); + } + + private void setOdpsUrl(String region) { + if (StringUtils.isEmpty(odpsUrl)) { + odpsUrl = odpsUrlTemplate.replace("{}", region); + if (enablePublicAccess) { + odpsUrl = odpsUrl.replace("-inc", ""); + } + } + odps.setEndpoint(odpsUrl); + } + + private void setTunnelUrl(String region) { + if (StringUtils.isEmpty(tunnelUrl)) { + tunnelUrl = tunnelUrlTemplate.replace("{}", region); + if (enablePublicAccess) { + tunnelUrl = tunnelUrl.replace("-inc", ""); + } } tunnel.setEndpoint(tunnelUrl); } @@ -215,4 +232,12 @@ public class MaxComputeExternalCatalog extends ExternalCatalog { } } } + + public String getOdpsUrl() { + return odpsUrl; + } + + public String getTunnelUrl() { + return tunnelUrl; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 7cde59fd834..474b4e6cb8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -269,6 +269,8 @@ public class MaxComputeExternalTable extends ExternalTable { tMcTable.setRegion(mcCatalog.getRegion()); tMcTable.setAccessKey(mcCatalog.getAccessKey()); tMcTable.setSecretKey(mcCatalog.getSecretKey()); + tMcTable.setOdpsUrl(mcCatalog.getOdpsUrl()); + tMcTable.setTunnelUrl(mcCatalog.getTunnelUrl()); tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess())); // use mc project as dbName tMcTable.setProject(dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java index 32c8534ace1..e3059cee4d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java @@ -31,6 +31,8 @@ public class MCProperties extends BaseProperties { public static final String SECRET_KEY = "mc.secret_key"; public static final String SESSION_TOKEN = "mc.session_token"; public static final String PUBLIC_ACCESS = "mc.public_access"; + public static final String ODPS_ENDPOINT = "mc.odps_endpoint"; + public static final String TUNNEL_SDK_ENDPOINT = "mc.tunnel_endpoint"; public static CloudCredential getCredential(Map<String, String> props) { return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index c720a402dd4..71af587d715 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -339,6 +339,8 @@ struct TMCTable { 4: optional string access_key 5: optional string secret_key 6: optional string public_access + 7: optional string odps_url + 8: optional string tunnel_url } // "Union" of all table types. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org