This is an automated email from the ASF dual-hosted git repository. ashingau pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6e28d878b5 [fix](hudi) compatible with hudi spark configuration and support skip merge (#24067) 6e28d878b5 is described below commit 6e28d878b5b072d9430e1e60a9f85c593e49cc53 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Sep 11 19:54:59 2023 +0800 [fix](hudi) compatible with hudi spark configuration and support skip merge (#24067) Fix three bugs: 1. Hudi slice maybe has log files only, so `new Path(filePath)` will throw errors. 2. Hive column names are lowercase only, so match column names in ignore-case-mode. 3. Compatible with [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options), so users can add `hoodie.datasource.merge.type=skip_merge` in catalog properties to skip merge logs files. --- be/src/vec/exec/format/table/hudi_jni_reader.cpp | 7 ++- be/src/vec/exec/format/table/hudi_jni_reader.h | 1 + docs/en/docs/lakehouse/multi-catalog/hudi.md | 3 ++ docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md | 4 ++ .../java/org/apache/doris/hudi/HudiJniScanner.java | 16 +++--- .../org/apache/doris/hudi/BaseSplitReader.scala | 48 +++++++++++------- .../apache/doris/hudi/HoodieRecordIterator.scala | 5 +- .../doris/planner/external/hudi/HudiScanNode.java | 19 ++++--- .../data/external_table_p2/hive/test_hive_hudi.out | 25 ++++++++++ .../external_table_p2/hive/test_hive_hudi.groovy | 58 ++++++++++++++++++++++ 10 files changed, 151 insertions(+), 35 deletions(-) diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp b/be/src/vec/exec/format/table/hudi_jni_reader.cpp index 029135ac67..bd6b40f3f1 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp +++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp @@ -36,6 +36,7 @@ class Block; namespace doris::vectorized { +const std::string HudiJniReader::HOODIE_CONF_PREFIX = "hoodie."; const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf."; HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, @@ -67,7 +68,11 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params, // Use compatible hadoop client to read data for (auto& kv : _scan_params.properties) { - params[HADOOP_CONF_PREFIX + kv.first] = kv.second; + if (kv.first.starts_with(HOODIE_CONF_PREFIX)) { + params[kv.first] = kv.second; + } else { + params[HADOOP_CONF_PREFIX + kv.first] = kv.second; + } } _jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params, diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h b/be/src/vec/exec/format/table/hudi_jni_reader.h index bf2dab943d..c0438e9328 100644 --- a/be/src/vec/exec/format/table/hudi_jni_reader.h +++ b/be/src/vec/exec/format/table/hudi_jni_reader.h @@ -46,6 +46,7 @@ class HudiJniReader : public GenericReader { ENABLE_FACTORY_CREATOR(HudiJniReader); public: + static const std::string HOODIE_CONF_PREFIX; static const std::string HADOOP_CONF_PREFIX; HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params, diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index 4c46ccb0e1..52892db2df 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -59,6 +59,9 @@ CREATE CATALOG hudi PROPERTIES ( Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md). +## Skip Merge +Spark will create the read optimize table with `_ro` suffix when generating hudi mor table. Doris will skip the log files when reading optimize table. Doris does not determine whether a table is read optimize by the `_ro` suffix instead of the hive inputformat. Users can observe whether the inputformat of the 'cow/mor/read optimize' table is the same through the `SHOW CREATE TABLE` command. In addition, Doris supports adding hoodie related configurations to catalog properties, which are [...] + ## Query Optimization Doris uses the parquet native reader to read the data files of the COW table, and uses the Java SDK (By calling hudi-bundle through JNI) to read the data files of the MOR table. In `upsert` scenario, there may still remains base files that have not been updated in the MOR table, which can be read through the parquet native reader. Users can view the execution plan of hudi scan through the [explain](../../advanced/best-practice/query-analysis.md) command, where `hudiNativeReadSplits` indi [...] ``` diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index 228be87420..b619283cac 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -59,6 +59,10 @@ CREATE CATALOG hudi PROPERTIES ( 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 +## Skip Merge +Spark 在创建 hudi mor 表的时候,会创建 `_ro` 后缀的 read optimize 表,doris 读取 read optimize 表会跳过 log 文件的合并。doris 判定一个表是否为 read optimize 表并不是通过 `_ro` 后缀,而是通过 hive inputformat,用户可以通过 `SHOW CREATE TABLE` 命令观察 cow/mor/read optimize 表的 inputformat 是否相同。 +此外 doris 支持在 catalog properties 添加 hoodie 相关的配置,配置项兼容 [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options)。所以用户可以在 catalog properties 中添加 `hoodie.datasource.merge.type=skip_merge` 跳过合并 log 文件。 + ## 查询优化 Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java SDK(通过JNI调用hudi-bundle) 读取 MOR 表的数据文件。在 upsert 场景下,MOR 依然会有数据文件没有被更新,这部分文件可以通过 parquet native reader读取,用户可以通过 [explain](../../advanced/best-practice/query-analysis.md) 命令查看 hudi scan 的执行计划,`hudiNativeReadSplits` 表示有多少 split 文件通过 parquet native reader 读取。 diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 417b338115..64c4fd70e7 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -86,6 +86,9 @@ public class HudiJniScanner extends JniScanner { static { int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4); + if (numThreads > 32) { + numThreads = Runtime.getRuntime().availableProcessors(); + } avroReadPool = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build()); LOG.info("Create " + numThreads + " daemon threads to load avro logs"); @@ -176,10 +179,15 @@ public class HudiJniScanner extends JniScanner { if (ugi != null) { recordIterator = ugi.doAs( (PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader( - split).buildScanIterator(split.requiredFields(), new Filter[0])); + split).buildScanIterator(new Filter[0])); } else { recordIterator = new MORSnapshotSplitReader(split) - .buildScanIterator(split.requiredFields(), new Filter[0]); + .buildScanIterator(new Filter[0]); + } + if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) { + cachedResolvers.computeIfAbsent(Thread.currentThread().getId(), + threadId -> AVRO_RESOLVER_CACHE.get()); + AVRO_RESOLVER_CACHE.get().clear(); } } catch (Exception e) { LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e); @@ -189,10 +197,6 @@ public class HudiJniScanner extends JniScanner { } isKilled.set(true); executorService.shutdownNow(); - if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) { - cachedResolvers.computeIfAbsent(Thread.currentThread().getId(), - threadId -> AVRO_RESOLVER_CACHE.get()); - } getRecordReaderTimeNs += System.nanoTime() - startTime; }); try { diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index 5ba16a5e16..3c10f8a4cd 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -153,6 +153,8 @@ case class HoodieTableInformation(sparkSession: SparkSession, metaClient: HoodieTableMetaClient, timeline: HoodieTimeline, tableConfig: HoodieTableConfig, + resolvedTargetFields: Array[String], + tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) /** @@ -214,22 +216,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) { * required to fetch table's Avro and Internal schemas */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { - val schemaResolver = new TableSchemaResolver(tableInformation.metaClient) - val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) - val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is => - AvroInternalSchemaConverter.convert(is, namespace + "." + name) - } orElse { - specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) - } orElse { - split.schemaSpec.map(s => convertToAvroSchema(s, tableName)) - } getOrElse { - Try(schemaResolver.getTableAvroSchema) match { - case Success(schema) => schema - case Failure(e) => - throw new HoodieSchemaException("Failed to fetch schema from the table", e) - } - } - (avroSchema, tableInformation.internalSchemaOpt) + (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt) } protected lazy val tableStructSchema: StructType = convertAvroSchemaToStructType(tableAvroSchema) @@ -280,13 +267,13 @@ abstract class BaseSplitReader(val split: HoodieSplit) { sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") } - def buildScanIterator(requiredColumns: Array[String], filters: Array[Filter]): Iterator[InternalRow] = { + def buildScanIterator(filters: Array[Filter]): Iterator[InternalRow] = { // NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES // *Appending* additional columns to the ones requested by the caller is not a problem, as those // will be eliminated by the caller's projection; // (!) Please note, however, that it's critical to avoid _reordering_ of the requested columns as this // will break the upstream projection - val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns) + val targetColumns: Array[String] = appendMandatoryColumns(tableInformation.resolvedTargetFields) // NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro // schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and // could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions @@ -663,11 +650,36 @@ object BaseSplitReader { None } } + val tableName = metaClient.getTableConfig.getTableName + val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + val avroSchema: Schema = internalSchemaOpt.map { is => + AvroInternalSchemaConverter.convert(is, namespace + "." + name) + } orElse { + specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) + } orElse { + split.schemaSpec.map(s => convertToAvroSchema(s, tableName)) + } getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + throw new HoodieSchemaException("Failed to fetch schema from the table", e) + } + } + + // match column name in lower case + val colNames = internalSchemaOpt.map { internalSchema => + internalSchema.getAllColsFullName.asScala.map(f => f.toLowerCase -> f).toMap + } getOrElse { + avroSchema.getFields.asScala.map(f => f.name().toLowerCase -> f.name()).toMap + } + val resolvedTargetFields = split.requiredFields.map(field => colNames.getOrElse(field.toLowerCase, field)) HoodieTableInformation(sparkSession, metaClient, timeline, metaClient.getTableConfig, + resolvedTargetFields, + avroSchema, internalSchemaOpt) } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala index c564565535..6e2b7b31e5 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala @@ -98,8 +98,9 @@ class HoodieMORRecordIterator(config: Configuration, case split => mergeType match { case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => - val reader = fileReaders.requiredSchemaReaderSkipMerging - new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) + // val reader = fileReaders.requiredSchemaReaderSkipMerging + // new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) + throw new UnsupportedOperationException("Skip merge is optimized by native read") case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => val reader = pickBaseFileReader() diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index c92c46659e..328c7b0f19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -79,7 +79,7 @@ public class HudiScanNode extends HiveScanNode { private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); - private final boolean isCowTable; + private final boolean isCowOrRoTable; private final AtomicLong noLogsSplitNum = new AtomicLong(0); @@ -91,9 +91,10 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); - isCowTable = hmsTable.isHoodieCowTable(); - if (isCowTable) { - LOG.debug("Hudi table {} can read as cow table", hmsTable.getName()); + isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals( + hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type")); + if (isCowOrRoTable) { + LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName()); } else { LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName()); } @@ -101,7 +102,7 @@ public class HudiScanNode extends HiveScanNode { @Override public TFileFormatType getFileFormatType() throws UserException { - if (isCowTable) { + if (isCowOrRoTable) { return super.getFileFormatType(); } else { // Use jni to read hudi table in BE @@ -124,7 +125,7 @@ public class HudiScanNode extends HiveScanNode { @Override protected Map<String, String> getLocationProperties() throws UserException { - if (isCowTable) { + if (isCowOrRoTable) { return super.getLocationProperties(); } else { // HudiJniScanner uses hadoop client to read data. @@ -291,7 +292,7 @@ public class HudiScanNode extends HiveScanNode { HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, timeline, statuses.toArray(new FileStatus[0])); - if (isCowTable) { + if (isCowOrRoTable) { fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); String filePath = baseFile.getPath(); @@ -312,7 +313,9 @@ public class HudiScanNode extends HiveScanNode { noLogsSplitNum.incrementAndGet(); } - HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, + // no base file, use log file to parse file type + String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; + HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, new String[0], partition.getPartitionValues()); split.setTableFormatType(TableFormatType.HUDI); split.setDataFilePath(filePath); diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi.out b/regression-test/data/external_table_p2/hive/test_hive_hudi.out new file mode 100644 index 0000000000..a695d3cdb7 --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_hudi.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !optimize_table -- +20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001 +20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004 + +-- !merge_on_read -- +20230801201335031 20230801201335031_0_1 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0 row_1 2021-01-01 0 bob v_0 toBeDel0 1 1000000 +20230801201335031 20230801201335031_1_1 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0 row_1 2011-11-11 1 bob v_1 toBeDel1 1 1000001 +20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004 + +-- !lowercase_column -- +row_1 2021-01-01 0 v_0 +row_1 2011-11-11 1 v_1 +row_2 2021-01-01 0 v_0 +row_4 2021-02-01 4 v_4 + +-- !skip_merge -- +20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001 +20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000 +20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004 + diff --git a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy new file mode 100644 index 0000000000..abdd5b34dc --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_hudi", "p2,external,hive,hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_hive_hudi" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'hadoop.username'='hadoop', + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + sql """use ${catalog_name}.hudi_catalog""" + // read optimize table with partition + qt_optimize_table """select * from partitioned_mor_ro order by rowid, versionid""" + // copy on write table with update + qt_merge_on_read """select * from partitioned_mor_rt order by rowid, versionid""" + // match colum name in lower case + qt_lowercase_column """select RoWiD, PaRtiTionID, PrEComB, VerSIonID from partitioned_mor_rt order by rowid, versionid""" + + + // skip logs + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'hadoop.username'='hadoop', + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}', + 'hoodie.datasource.merge.type'='skip_merge' + ); + """ + // copy on write table with update, skip merge logs, so the result is the same as partitioned_mor_ro + qt_skip_merge """select * from partitioned_mor_rt order by rowid, versionid""" + + sql """drop catalog if exists ${catalog_name};""" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org