dujl commented on code in PR #9752: URL: https://github.com/apache/incubator-doris/pull/9752#discussion_r882594727
########## fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java: ########## @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.external.hudi.HudiProperty; +import org.apache.doris.external.hudi.HudiTable; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerRangeDesc; +import org.apache.doris.thrift.TBrokerScanRangeParams; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.mortbay.log.Log; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Hudi scan node to query hudi table. + */ +public class HudiScanNode extends BrokerScanNode { + private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); + + private HudiTable hudiTable; + // partition column predicates of hive table + private List<ExprNodeDesc> hivePredicates = new ArrayList<>(); + private ExprNodeGenericFuncDesc hivePartitionPredicate; + private List<ImportColumnDesc> parsedColumnExprList = new ArrayList<>(); + private String hdfsUri; + + private Table remoteHiveTable; + + /* hudi table properties */ + private String fileFormat; + private String inputFormatName; + private String basePath; + private List<String> partitionKeys = new ArrayList<>(); + /* hudi table properties */ + + private List<TScanRangeLocations> scanRangeLocations; + + public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName, + List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) { + super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded); + this.hudiTable = (HudiTable) destTupleDesc.getTable(); + } + + public String getHdfsUri() { + return hdfsUri; + } + + public List<ImportColumnDesc> getParsedColumnExprList() { + return parsedColumnExprList; + } + + public String getFileFormat() { + return fileFormat; + } + + public String getBasePath() { + return basePath; + } + + public List<String> getPartitionKeys() { + return partitionKeys; + } + + /** + * super init will invoke initFileGroup, In initFileGroup will do + * 1, get hudi table from hive metastore + * 2, resolve hudi table type, query mode, table base path, partition columns information. + * 3. generate fileGroup + * + * @param analyzer analyzer + * @throws UserException when init failed. + */ + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + // init scan range params + initParams(analyzer); + } + + @Override + public int getNumInstances() { + return scanRangeLocations.size(); + } + + @Override + protected void initFileGroup() throws UserException { + resolvHiveTable(); + analyzeColumnFromPath(); + + HudiTable hudiTable = (HudiTable) desc.getTable(); + fileGroups = Lists.newArrayList( + new BrokerFileGroup(hudiTable.getId(), + "\t", + "\n", + getBasePath(), + getFileFormat(), + getPartitionKeys(), + getParsedColumnExprList())); + brokerDesc = new BrokerDesc("HudiTableDesc", StorageBackend.StorageType.HDFS, hudiTable.getTableProperties()); + + } + + /** + * Override this function just for skip parent's getFileStatus. + */ + @Override + protected void getFileStatus() throws DdlException { + if (partitionKeys.size() > 0) { + extractHivePartitionPredicate(); + } + // set fileStatusesList as empty, we do not need fileStatusesList + fileStatusesList = Lists.newArrayList(); + filesAdded = 0; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + try { + ParamCreateContext context = getParamCreateContexts().get(0); + finalizeParams(context.slotDescByName, context.exprMap, context.params, + context.srcTupleDescriptor, false, context.fileGroup.isNegative(), analyzer); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + try { + buildScanRange(); + } catch (IOException e) { + LOG.error("Build scan range failed.", e); + throw new UserException("Build scan range failed.", e); + } + } + + @Override + public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) { + return scanRangeLocations; + } + + private void resolvHiveTable() throws DdlException { + this.remoteHiveTable = HiveMetaStoreClientHelper.getTable( + hudiTable.getHmsDatabaseName(), + hudiTable.getHmsTableName(), + hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)); + + this.inputFormatName = remoteHiveTable.getSd().getInputFormat(); + this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(this.inputFormatName); + this.basePath = remoteHiveTable.getSd().getLocation(); + for (FieldSchema fieldSchema : remoteHiveTable.getPartitionKeys()) { + this.partitionKeys.add(fieldSchema.getName()); + } + Log.info("Hudi inputFileFormat is " + inputFormatName + ", basePath is " + this.basePath); + } + + private void initParams(Analyzer analyzer) { + ParamCreateContext context = getParamCreateContexts().get(0); + TBrokerScanRangeParams params = context.params; + + Map<String, SlotDescriptor> slotDescByName = Maps.newHashMap(); + + List<Column> columns = hudiTable.getBaseSchema(false); + // init slot desc add expr map, also transform hadoop functions + for (Column column : columns) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setIsMaterialized(true); + slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); + params.addToSrcSlotIds(slotDesc.getId().asInt()); + slotDescByName.put(column.getName(), slotDesc); + } + context.slotDescByName = slotDescByName; + } + + + /** + * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive. + */ + private void extractHivePartitionPredicate() throws DdlException { + ListIterator<Expr> it = conjuncts.listIterator(); + while (it.hasNext()) { + ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr( + it.next(), partitionKeys, hudiTable.getName()); + if (hiveExpr != null) { + hivePredicates.add(hiveExpr); + } + } + int count = hivePredicates.size(); + // combine all predicate by `and` + // compoundExprs must have at least 2 predicates + if (count >= 2) { + hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(hivePredicates, "and"); + } else if (count == 1) { + // only one predicate + hivePartitionPredicate = (ExprNodeGenericFuncDesc) hivePredicates.get(0); + } else { + // have no predicate, make a dummy predicate "1=1" to get all partitions + HiveMetaStoreClientHelper.ExprBuilder exprBuilder = + new HiveMetaStoreClientHelper.ExprBuilder(hudiTable.getName()); + hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1) + .val(TypeInfoFactory.intTypeInfo, 1) + .pred("=", 2).build(); + } + } + + private InputSplit[] getSplits() throws UserException, IOException { + String splitsPath = basePath; + if (partitionKeys.size() > 0) { + extractHivePartitionPredicate(); + + String metaStoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS); + List<Partition> hivePartitions = + HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, remoteHiveTable, hivePartitionPredicate); + splitsPath = hivePartitions.stream() + .map(x -> x.getSd().getLocation()).collect(Collectors.joining(",")); + } + + + Configuration configuration = new Configuration(); + InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + // alway get fileSplits from inputformat, + // because all hoodie input format have UseFileSplitsFromInputFormat annotation + JobConf jobConf = new JobConf(configuration); + FileInputFormat.setInputPaths(jobConf, splitsPath); + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0); + return inputSplits; + + } + + // If fileFormat is not null, we use fileFormat instead of check file's suffix + protected void buildScanRange() throws UserException, IOException { + scanRangeLocations = Lists.newArrayList(); + InputSplit[] inputSplits = getSplits(); + if (inputSplits.length == 0) { + return; + } + + THdfsParams hdfsParams = new THdfsParams(); + String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + hdfsParams.setFsName(fsName); + Log.info("Hudi path's host is " + fsName); + + TFileFormatType formatType = TFileFormatType.FORMAT_PARQUET; Review Comment: hudi have two type files: base file and log file, base file is parquet type and log file is avro type. anyway, formattype will decided by inputformat className, current it will only be parquet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org