This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7147a7c290 [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977) 7147a7c290 is described below commit 7147a7c2906bdc488732a8c742db59ff0adcfa25 Author: huangzhaowei <huangzhaowei....@bytedance.com> AuthorDate: Thu Jul 21 17:38:53 2022 +0800 [feature-wip](multi-catalog) Support s3 storage for file scan node (#10977) This is an example of s3 hms_catalog: ```sql CREATE CATALOG hms_catalog properties( "type" = "hms", "hive.metastore.uris"="thrift://localhost:9083", "AWS_ACCESS_KEY" = "your access key", "AWS_SECRET_KEY"="your secret key", "AWS_ENDPOINT"="s3 endpoint", "AWS_REGION"="s3-region", "fs.s3a.paging.maximum"="1000"); ``` All these params are necessary; --- .gitignore | 1 + be/src/io/file_factory.cpp | 52 ++++++++++++++++++++++ be/src/io/file_factory.h | 14 ++++++ be/src/vec/exec/file_arrow_scanner.cpp | 10 +++-- be/src/vec/exec/file_text_scanner.cpp | 8 ++-- bin/start_be.sh | 10 +++++ .../java/org/apache/doris/catalog/HiveTable.java | 5 +++ .../doris/datasource/DataSourceProperty.java | 24 ++++++++++ .../planner/external/ExternalFileScanNode.java | 33 +++++++++----- .../planner/external/ExternalFileScanProvider.java | 4 +- .../planner/external/ExternalHiveScanProvider.java | 46 ++++++++++++++----- .../external/ExternalIcebergScanProvider.java | 14 ++---- gensrc/thrift/PlanNodes.thrift | 2 + 13 files changed, 181 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index c8cf1a765b..e743211fac 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ package-lock.json # ignore project file .cproject .project +.cache .settings/ **/.idea/ **/.vscode/ diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 9937eaa6d5..46d19669b9 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -130,3 +130,55 @@ doris::Status doris::FileFactory::create_file_reader( } return Status::OK(); } + +doris::Status doris::FileFactory::_new_file_reader(doris::ExecEnv* env, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const doris::TFileRangeDesc& range, + FileReader*& file_reader_ptr) { + doris::TFileType::type type = params.file_type; + + if (type == TFileType::FILE_STREAM) { + return Status::InternalError("UnSupport UniquePtr For FileStream type"); + } + + switch (type) { + case TFileType::FILE_S3: { + file_reader_ptr = new BufferedReader( + profile, new S3Reader(params.properties, range.path, range.start_offset)); + break; + } + case TFileType::FILE_HDFS: { + FileReader* hdfs_reader = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, range.path, + range.start_offset, &hdfs_reader)); + file_reader_ptr = new BufferedReader(profile, hdfs_reader); + break; + } + default: + return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type)); + } + + return Status::OK(); +} + +doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const doris::TFileRangeDesc& range, + std::shared_ptr<FileReader>& file_reader) { + FileReader* file_reader_ptr; + RETURN_IF_ERROR(_new_file_reader(env, profile, params, range, file_reader_ptr)); + file_reader.reset(file_reader_ptr); + + return Status::OK(); +} + +doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const doris::TFileRangeDesc& range, + std::unique_ptr<FileReader>& file_reader) { + FileReader* file_reader_ptr; + RETURN_IF_ERROR(_new_file_reader(env, profile, params, range, file_reader_ptr)); + file_reader.reset(file_reader_ptr); + + return Status::OK(); +} diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index c1350d6682..c36b611f9e 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -48,6 +48,16 @@ public: const TBrokerRangeDesc& range, int64_t start_offset, std::shared_ptr<FileReader>& file_reader); + static Status create_file_reader(ExecEnv* env, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const TFileRangeDesc& range, + std::unique_ptr<FileReader>& file_reader); + + static Status create_file_reader(ExecEnv* env, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const TFileRangeDesc& range, + std::shared_ptr<FileReader>& file_reader); + static TFileType::type convert_storage_type(TStorageBackendType::type type) { switch (type) { case TStorageBackendType::LOCAL: @@ -72,6 +82,10 @@ private: const std::map<std::string, std::string>& properties, const TBrokerRangeDesc& range, int64_t start_offset, FileReader*& file_reader); + + static Status _new_file_reader(ExecEnv* env, RuntimeProfile* profile, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + FileReader*& file_reader); }; } // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 65b4a032ad..bd1428e820 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -17,8 +17,11 @@ #include "vec/exec/file_arrow_scanner.h" +#include <memory> + #include "exec/arrow/parquet_reader.h" #include "io/buffered_reader.h" +#include "io/file_factory.h" #include "io/hdfs_reader_writer.h" #include "runtime/descriptors.h" #include "vec/utils/arrow_column_to_doris_column.h" @@ -54,10 +57,9 @@ Status FileArrowScanner::_open_next_reader() { } const TFileRangeDesc& range = _ranges[_next_range++]; std::unique_ptr<FileReader> file_reader; - FileReader* hdfs_reader = nullptr; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path, - range.start_offset, &hdfs_reader)); - file_reader.reset(new BufferedReader(_profile, hdfs_reader)); + + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, + range, file_reader)); RETURN_IF_ERROR(file_reader->open()); if (file_reader->size() == 0) { file_reader->close(); diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp index c7d95e45ab..593b78867f 100644 --- a/be/src/vec/exec/file_text_scanner.cpp +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -28,6 +28,7 @@ #include "exec/text_converter.hpp" #include "exprs/expr_context.h" #include "io/buffered_reader.h" +#include "io/file_factory.h" #include "io/hdfs_reader_writer.h" #include "util/types.h" #include "util/utf8_check.h" @@ -152,11 +153,8 @@ Status FileTextScanner::_open_next_reader() { Status FileTextScanner::_open_file_reader() { const TFileRangeDesc& range = _ranges[_next_range]; - - FileReader* hdfs_reader = nullptr; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path, - range.start_offset, &hdfs_reader)); - _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader)); + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, + _cur_file_reader)); return _cur_file_reader->open(); } diff --git a/bin/start_be.sh b/bin/start_be.sh index 74e1b49c14..551a12e5c0 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -31,12 +31,17 @@ OPTS=$(getopt \ eval set -- "$OPTS" RUN_DAEMON=0 +RUN_IN_AWS=0 while true; do case "$1" in --daemon) RUN_DAEMON=1 shift ;; + --aws) + RUN_IN_AWS=1 + shift + ;; --) shift break @@ -170,6 +175,11 @@ else LIMIT="/bin/limit3 -c 0 -n 65536" fi +## If you are not running in aws cloud, disable this env since https://github.com/aws/aws-sdk-cpp/issues/1410. +if [ ${RUN_IN_AWS} -eq 0 ]; then + export AWS_EC2_METADATA_DISABLED=true +fi + if [ ${RUN_DAEMON} -eq 1 ]; then nohup $LIMIT ${DORIS_HOME}/lib/doris_be "$@" >> $LOG_DIR/be.out 2>&1 < /dev/null & else diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index c04017ef7b..e512f33c64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -53,10 +53,15 @@ public class HiveTable extends Table { public static final String HIVE_TABLE = "table"; public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; public static final String HIVE_HDFS_PREFIX = "dfs."; + public static final String S3_FS_PREFIX = "fs.s3"; public static final String S3_PROPERTIES_PREFIX = "AWS"; public static final String S3_AK = "AWS_ACCESS_KEY"; public static final String S3_SK = "AWS_SECRET_KEY"; public static final String S3_ENDPOINT = "AWS_ENDPOINT"; + public static final String AWS_REGION = "AWS_REGION"; + public static final String AWS_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE"; + public static final String AWS_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS"; + public static final String AWS_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS"; public HiveTable() { super(TableType.HIVE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java index 1351e29a11..0d3e67c5fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceProperty.java @@ -57,16 +57,40 @@ public class DataSourceProperty implements Writable { Map<String, String> s3Properties = Maps.newHashMap(); if (properties.containsKey(HiveTable.S3_AK)) { s3Properties.put("fs.s3a.access.key", properties.get(HiveTable.S3_AK)); + s3Properties.put(HiveTable.S3_AK, properties.get(HiveTable.S3_AK)); } if (properties.containsKey(HiveTable.S3_SK)) { s3Properties.put("fs.s3a.secret.key", properties.get(HiveTable.S3_SK)); + s3Properties.put(HiveTable.S3_SK, properties.get(HiveTable.S3_SK)); } if (properties.containsKey(HiveTable.S3_ENDPOINT)) { s3Properties.put("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT)); + s3Properties.put(HiveTable.S3_ENDPOINT, properties.get(HiveTable.S3_ENDPOINT)); + } + if (properties.containsKey(HiveTable.AWS_REGION)) { + s3Properties.put("fs.s3a.endpoint.region", properties.get(HiveTable.AWS_REGION)); + s3Properties.put(HiveTable.AWS_REGION, properties.get(HiveTable.AWS_REGION)); + } + if (properties.containsKey(HiveTable.AWS_MAX_CONN_SIZE)) { + s3Properties.put("fs.s3a.connection.maximum", properties.get(HiveTable.AWS_MAX_CONN_SIZE)); + s3Properties.put(HiveTable.AWS_MAX_CONN_SIZE, properties.get(HiveTable.AWS_MAX_CONN_SIZE)); + } + if (properties.containsKey(HiveTable.AWS_REQUEST_TIMEOUT_MS)) { + s3Properties.put("fs.s3a.connection.request.timeout", properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS)); + s3Properties.put(HiveTable.AWS_REQUEST_TIMEOUT_MS, properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS)); + } + if (properties.containsKey(HiveTable.AWS_CONN_TIMEOUT_MS)) { + s3Properties.put("fs.s3a.connection.timeout", properties.get(HiveTable.AWS_CONN_TIMEOUT_MS)); + s3Properties.put(HiveTable.AWS_CONN_TIMEOUT_MS, properties.get(HiveTable.AWS_CONN_TIMEOUT_MS)); } s3Properties.put("fs.s3.impl.disable.cache", "true"); s3Properties.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); s3Properties.put("fs.s3a.attempts.maximum", "2"); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().startsWith(HiveTable.S3_FS_PREFIX)) { + s3Properties.put(entry.getKey(), entry.getValue()); + } + } return s3Properties; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 78ef63f257..9fbb8fa3fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -74,7 +74,8 @@ import java.util.Random; import java.util.Set; /** - * ExternalFileScanNode for the file access type of datasource, now only support hive,hudi and iceberg. + * ExternalFileScanNode for the file access type of datasource, now only support + * hive,hudi and iceberg. */ public class ExternalFileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(ExternalFileScanNode.class); @@ -219,9 +220,11 @@ public class ExternalFileScanNode extends ExternalScanNode { if (scanProvider.getTableFormatType().equals(TFileFormatType.FORMAT_CSV_PLAIN)) { Map<String, String> serDeInfoParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); String columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) - ? HIVE_DEFAULT_COLUMN_SEPARATOR : serDeInfoParams.get("field.delim"); + ? HIVE_DEFAULT_COLUMN_SEPARATOR + : serDeInfoParams.get("field.delim"); String lineDelimiter = Strings.isNullOrEmpty(serDeInfoParams.get("line.delim")) - ? HIVE_DEFAULT_LINE_DELIMITER : serDeInfoParams.get("line.delim"); + ? HIVE_DEFAULT_LINE_DELIMITER + : serDeInfoParams.get("line.delim"); TFileTextScanRangeParams textParams = new TFileTextScanRangeParams(); textParams.setLineDelimiterStr(lineDelimiter); @@ -246,7 +249,8 @@ public class ExternalFileScanNode extends ExternalScanNode { slotDescByName.put(column.getName(), slotDesc); } - // Hive table must extract partition value from path and hudi/iceberg table keep partition field in file. + // Hive table must extract partition value from path and hudi/iceberg table keep + // partition field in file. partitionKeys.addAll(scanProvider.getPathPartitionKeys()); context.params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size()); for (SlotDescriptor slot : desc.getSlots()) { @@ -276,14 +280,14 @@ public class ExternalFileScanNode extends ExternalScanNode { // If fileFormat is not null, we use fileFormat instead of check file's suffix private void buildScanRange() throws UserException, IOException { scanRangeLocations = Lists.newArrayList(); - InputSplit[] inputSplits = scanProvider.getSplits(conjuncts); - if (0 == inputSplits.length) { + List<InputSplit> inputSplits = scanProvider.getSplits(conjuncts); + if (inputSplits.isEmpty()) { return; } - inputSplitsNum = inputSplits.length; + inputSplitsNum = inputSplits.size(); - String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); - String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); + String fullPath = ((FileSplit) inputSplits.get(0)).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits.get(0)).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); context.params.setFileType(scanProvider.getTableFileType()); context.params.setFormatType(scanProvider.getTableFormatType()); @@ -292,6 +296,8 @@ public class ExternalFileScanNode extends ExternalScanNode { THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); tHdfsParams.setFsName(fsName); context.params.setHdfsParams(tHdfsParams); + } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) { + context.params.setProperties(hmsTable.getS3Properties()); } TScanRangeLocations curLocations = newLocations(context.params); @@ -312,6 +318,7 @@ public class ExternalFileScanNode extends ExternalScanNode { + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" + " loaction: " + Joiner.on("|").join(split.getLocations())); + fileSplitStrategy.update(fileSplit); // Add a new location when it's can be split if (fileSplitStrategy.hasNext()) { @@ -353,10 +360,15 @@ public class ExternalFileScanNode extends ExternalScanNode { FileSplit fileSplit, List<String> columnsFromPath) throws DdlException, MetaNotFoundException { TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); rangeDesc.setColumnsFromPath(columnsFromPath); + + if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + } else if (scanProvider.getTableFileType() == TFileType.FILE_S3) { + rangeDesc.setPath(fileSplit.getPath().toString()); + } return rangeDesc; } @@ -417,4 +429,3 @@ public class ExternalFileScanNode extends ExternalScanNode { return output.toString(); } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java index ebdd7a768a..36e11d8845 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanProvider.java @@ -37,11 +37,11 @@ import java.util.Map; public interface ExternalFileScanProvider { TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException; - TFileType getTableFileType(); + TFileType getTableFileType() throws DdlException, MetaNotFoundException; String getMetaStoreUrl(); - InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException; + List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException; Table getRemoteHiveTable() throws DdlException, MetaNotFoundException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java index c60ee3c211..849511b0bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java @@ -27,6 +27,7 @@ import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -39,6 +40,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -69,8 +71,16 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { } @Override - public TFileType getTableFileType() { - return TFileType.FILE_HDFS; + public TFileType getTableFileType() throws DdlException, MetaNotFoundException { + String location = hmsTable.getRemoteTable().getSd().getLocation(); + if (location != null && !location.isEmpty()) { + if (location.startsWith("s3a") || location.startsWith("s3n")) { + return TFileType.FILE_S3; + } else if (location.startsWith("hdfs:")) { + return TFileType.FILE_HDFS; + } + } + throw new DdlException("Unknown file type for hms table."); } @Override @@ -79,34 +89,50 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { } @Override - public InputSplit[] getSplits(List<Expr> exprs) + public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException { String splitsPath = getRemoteHiveTable().getSd().getLocation(); List<String> partitionKeys = getRemoteHiveTable().getPartitionKeys() .stream().map(FieldSchema::getName).collect(Collectors.toList()); + List<Partition> hivePartitions = new ArrayList<>(); if (partitionKeys.size() > 0) { ExprNodeGenericFuncDesc hivePartitionPredicate = HiveMetaStoreClientHelper.convertToHivePartitionExpr( exprs, partitionKeys, hmsTable.getName()); String metaStoreUris = getMetaStoreUrl(); - List<Partition> hivePartitions = HiveMetaStoreClientHelper.getHivePartitions( - metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate); - if (!hivePartitions.isEmpty()) { - splitsPath = hivePartitions.stream().map(x -> x.getSd().getLocation()) - .collect(Collectors.joining(",")); - } + hivePartitions.addAll(HiveMetaStoreClientHelper.getHivePartitions( + metaStoreUris, getRemoteHiveTable(), hivePartitionPredicate)); } String inputFormatName = getRemoteHiveTable().getSd().getInputFormat(); Configuration configuration = setConfiguration(); InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + if (!hivePartitions.isEmpty()) { + return hivePartitions.parallelStream() + .flatMap(x -> getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream()) + .collect(Collectors.toList()); + } else { + return getSplitsByPath(inputFormat, configuration, splitsPath); + } + } + + private List<InputSplit> getSplitsByPath( + InputFormat<?, ?> inputFormat, + Configuration configuration, + String splitsPath) { JobConf jobConf = new JobConf(configuration); FileInputFormat.setInputPaths(jobConf, splitsPath); - return inputFormat.getSplits(jobConf, 0); + try { + InputSplit[] splits = inputFormat.getSplits(jobConf, 0); + return Lists.newArrayList(splits); + } catch (IOException e) { + return new ArrayList<InputSplit>(); + } } + protected Configuration setConfiguration() { Configuration conf = new Configuration(); Map<String, String> dfsProperties = hmsTable.getDfsProperties(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java index 83ddc86c4d..10fd85b242 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalIcebergScanProvider.java @@ -24,7 +24,6 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -56,7 +55,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { public TFileFormatType getTableFormatType() throws DdlException, MetaNotFoundException { TFileFormatType type; - String icebergFormat = getRemoteHiveTable().getParameters() + String icebergFormat = getRemoteHiveTable().getParameters() .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); if (icebergFormat.equals("parquet")) { type = TFileFormatType.FORMAT_PARQUET; @@ -69,12 +68,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { } @Override - public TFileType getTableFileType() { - return TFileType.FILE_HDFS; - } - - @Override - public InputSplit[] getSplits(List<Expr> exprs) throws IOException, UserException { + public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, UserException { List<Expression> expressions = new ArrayList<>(); for (Expr conjunct : exprs) { Expression expression = IcebergUtils.convertToIcebergExpr(conjunct); @@ -88,7 +82,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { for (Expression predicate : expressions) { scan = scan.filter(predicate); } - List<FileSplit> splits = new ArrayList<>(); + List<InputSplit> splits = new ArrayList<>(); for (FileScanTask task : scan.planFiles()) { for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) { @@ -96,7 +90,7 @@ public class ExternalIcebergScanProvider extends ExternalHiveScanProvider { spitTask.start(), spitTask.length(), new String[0])); } } - return splits.toArray(new InputSplit[0]); + return splits; } private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b6d0a3b19a..54f50085a9 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -237,6 +237,8 @@ struct TFileScanRangeParams { 6: optional THdfsParams hdfs_params; 7: optional TFileTextScanRangeParams text_params; + // properties for file such as s3 information + 8: optional map<string, string> properties; } struct TFileRangeDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org