This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0e17cd4d92 [fix](hudi) use hudi api to split the COW table (#21385) 0e17cd4d92 is described below commit 0e17cd4d9203ee1621baa5e75eb313a56d7e455d Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Sat Jul 1 08:35:33 2023 +0800 [fix](hudi) use hudi api to split the COW table (#21385) Fix tow bugs: COW & Read Optimized table will use hive splitter to split files, but it can't recognize some specific files. ERROR 1105 (HY000): errCode = 2, detailMessage = (172.21.0.101)[CORRUPTION]Invalid magic number in parquet file, bytes read: 3035, file size: 3035, path: /usr/hive/warehouse/hudi.db/test/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight, read magic: The read optimized table created by spark will add empty partition even if the table has no partition, so we have to filter these empty partition keys in hive client. | test_ro | CREATE TABLE `test_ro`( `_hoodie_commit_time` string COMMENT '', ... `ts` bigint COMMENT '') PARTITIONED BY ( `` string) ROW FORMAT SERDE --- .../doris/planner/external/HiveScanNode.java | 5 +- .../doris/planner/external/hudi/HudiScanNode.java | 71 +++++++++++----------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index b8ac376307..738a2e3933 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -215,8 +215,9 @@ public class HiveScanNode extends FileQueryScanNode { @Override public List<String> getPathPartitionKeys() { - return hmsTable.getRemoteTable().getPartitionKeys() - .stream().map(FieldSchema::getName).map(String::toLowerCase).collect(Collectors.toList()); + return hmsTable.getRemoteTable().getPartitionKeys().stream() + .map(FieldSchema::getName).filter(partitionKey -> !"".equals(partitionKey)) + .map(String::toLowerCase).collect(Collectors.toList()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index fdb50e78a7..3c4fb0d1fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; -import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -56,7 +55,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -141,15 +139,6 @@ public class HudiScanNode extends HiveScanNode { @Override public List<Split> getSplits() throws UserException { - if (isCowTable) { - // skip hidden files start with "." - List<Split> cowSplits = super.getSplits().stream() - .filter(split -> !((FileSplit) split).getPath().getName().startsWith(".")) - .collect(Collectors.toList()); - noLogsSplitNum = cowSplits.size(); - return cowSplits; - } - HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); hudiClient.reloadActiveTimeline(); String basePath = hmsTable.getRemoteTable().getSd().getLocation(); @@ -207,32 +196,42 @@ public class HudiScanNode extends HiveScanNode { HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, timeline, statuses.toArray(new FileStatus[0])); - Iterator<FileSlice> hoodieFileSliceIterator = fileSystemView - .getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).iterator(); - while (hoodieFileSliceIterator.hasNext()) { - FileSlice fileSlice = hoodieFileSliceIterator.next(); - Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional(); - String filePath = baseFile.map(BaseFile::getPath).orElse(""); - long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); - - List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString) - .collect(Collectors.toList()); - if (logs.isEmpty()) { + if (isCowTable) { + fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum++; - } - - HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], - partition.getPartitionValues()); - split.setTableFormatType(TableFormatType.HUDI); - split.setDataFilePath(filePath); - split.setHudiDeltaLogs(logs); - split.setInputFormat(inputFormat); - split.setSerde(serdeLib); - split.setBasePath(basePath); - split.setHudiColumnNames(columnNames); - split.setHudiColumnTypes(columnTypes); - split.setInstantTime(queryInstant); - splits.add(split); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + FileSplit split = new FileSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], + partition.getPartitionValues()); + splits.add(split); + }); + } else { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) + .forEach(fileSlice -> { + Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional(); + String filePath = baseFile.map(BaseFile::getPath).orElse(""); + long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); + + List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) + .map(Path::toString) + .collect(Collectors.toList()); + if (logs.isEmpty()) { + noLogsSplitNum++; + } + + HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, + new String[0], partition.getPartitionValues()); + split.setTableFormatType(TableFormatType.HUDI); + split.setDataFilePath(filePath); + split.setHudiDeltaLogs(logs); + split.setInputFormat(inputFormat); + split.setSerde(serdeLib); + split.setBasePath(basePath); + split.setHudiColumnNames(columnNames); + split.setHudiColumnTypes(columnTypes); + split.setInstantTime(queryInstant); + splits.add(split); + }); } } } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org