morningman commented on code in PR #10402:
URL: https://github.com/apache/doris/pull/10402#discussion_r906899654


##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;

Review Comment:
   use optional



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot 
and partition slot.
+  1: required Types.TTupleId src_tuple_id;

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java:
##########
@@ -1686,49 +1688,55 @@ private PlanNode createScanNode(Analyzer analyzer, 
TableRef tblRef, SelectStmt s
             throws UserException {
         ScanNode scanNode = null;
 
-        switch (tblRef.getTable().getType()) {
-            case OLAP:
-                OlapScanNode olapNode = new OlapScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(),
-                        "OlapScanNode");
-                olapNode.setForceOpenPreAgg(tblRef.isForcePreAggOpened());
-                scanNode = olapNode;
-                break;
-            case ODBC:
-                scanNode = new OdbcScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), (OdbcTable) tblRef.getTable());
-                break;
-            case MYSQL:
-                scanNode = new MysqlScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), (MysqlTable) tblRef.getTable());
-                break;
-            case SCHEMA:
-                scanNode = new SchemaScanNode(ctx.getNextNodeId(), 
tblRef.getDesc());
-                break;
-            case BROKER:
-                scanNode = new BrokerScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "BrokerScanNode",
-                        null, -1);
-                break;
-            case ELASTICSEARCH:
-                scanNode = new EsScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "EsScanNode");
-                break;
-            case HIVE:
-                scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "HiveScanNode",
-                        null, -1);
-                break;
-            case ICEBERG:
-                scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "IcebergScanNode",
-                        null, -1);
-                break;
-            case HUDI:
-                scanNode = new HudiScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "HudiScanNode",
-                        null, -1);
-                break;
-            default:
-                break;
-        }
-        if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode 
|| scanNode instanceof HiveScanNode) {
-            if (analyzer.enableInferPredicate()) {
-                PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), 
analyzer);
+        if (Config.enable_multi_catalog) {

Review Comment:
   Add `HMS_EXTERNAL_TABLE` in `TableType`



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -253,45 +264,40 @@ private void buildScanRange() throws UserException, 
IOException {
         String filePath = ((FileSplit) 
inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         hdfsParams.setFsName(fsName);
-        List<String> partitionKeys = new ArrayList<>();
-        for (FieldSchema fieldSchema : 
hmsTable.getRemoteTable().getPartitionKeys()) {
-            partitionKeys.add(fieldSchema.getName());
-        }
+
 
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
             TScanRangeLocations curLocations = newLocations(context.params);
             List<String> partitionValuesFromPath = 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
                     partitionKeys);
-            int numberOfColumnsFromFile = context.slotDescByName.size() - 
partitionValuesFromPath.size();
 
-            TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, 
partitionValuesFromPath,
-                    numberOfColumnsFromFile);
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath);
             rangeDesc.setHdfsParams(hdfsParams);
-            rangeDesc.setReadByColumnDef(true);
 
-            
curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc);
+            
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
             Log.debug("Assign to backend " + 
curLocations.getLocations().get(0).getBackendId()
                     + " with table split: " +  fileSplit.getPath()
                     + " ( " + fileSplit.getStart() + "," + 
fileSplit.getLength() + ")");
 
             // Put the last file
-            if 
(curLocations.getScanRange().getBrokerScanRange().isSetRanges()) {
+            if 
(curLocations.getScanRange().getExtScanRange().getFileScanRange().isSetRanges())
 {

Review Comment:
   This `if` always return `true`?



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot 
and partition slot.
+  1: required Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and 
all_partition_slot
+  2: required i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: required list<TFileScanSlotInfo> required_slots;
+
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: required Types.TFileType file_type;

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java:
##########
@@ -117,6 +117,15 @@ public List<String> getDbNames() {
         return dbNames;
     }
 
+    public DataSourceIf getExternalDatasource(String name) {
+        readLock();

Review Comment:
   nameToCatalogs is a concurrentMap, no need to lock



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -212,8 +214,54 @@ struct TEsScanRange {
   4: required i32 shard_id
 }
 
-struct TFileScanRange {
+struct TFileTextScanRangeParams {
+    3: optional i8 column_separator;
+    4: optional i8 line_delimiter;
+    // for multibytes separators
+    5: optional i32 column_separator_length = 1;
+    6: optional i32 line_delimiter_length = 1;
+    7: optional string column_separator_str;
+    8: optional string line_delimiter_str;
+}
+
+struct TFileScanSlotInfo {
+    1: required Types.TSlotId slot_id;
+    2: required bool is_file_slot;
+}
+
+struct TFileScanRangeParams {
+  // use src_tuple_id to get all slots from src table include both file slot 
and partition slot.
+  1: required Types.TTupleId src_tuple_id;
+  // num_of_columns_from_file can spilt the all_file_slot and 
all_partition_slot
+  2: required i32 num_of_columns_from_file;
+  // all selected slots which may compose from file and partiton value.
+  3: required list<TFileScanSlotInfo> required_slots;
+
+  4: optional TFileTextScanRangeParams text_params;
+}
+
+struct TFileRangeDesc {
+    1: required Types.TFileType file_type;
+    2: required TFileFormatType format_type;
+    // Path of this range
+    3: required string path;
+    // Offset of this file start
+    4: required i64 start_offset;
+    // Size of this range, if size = -1, this means that will read to then end 
of file
+    5: required i64 size;
+    // total size of the file
+    6: optional i64 file_size;
+
+    // columns parsed from file path should be after the columns read from file
+    7: optional list<string> columns_from_path;
 
+    8: optional THdfsParams hdfs_params;
+}
+
+// HDFS file scan range
+struct TFileScanRange {
+    1: required list<TFileRangeDesc> ranges

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -253,45 +264,40 @@ private void buildScanRange() throws UserException, 
IOException {
         String filePath = ((FileSplit) 
inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         hdfsParams.setFsName(fsName);
-        List<String> partitionKeys = new ArrayList<>();
-        for (FieldSchema fieldSchema : 
hmsTable.getRemoteTable().getPartitionKeys()) {
-            partitionKeys.add(fieldSchema.getName());
-        }
+
 
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
             TScanRangeLocations curLocations = newLocations(context.params);
             List<String> partitionValuesFromPath = 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
                     partitionKeys);
-            int numberOfColumnsFromFile = context.slotDescByName.size() - 
partitionValuesFromPath.size();
 
-            TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, 
partitionValuesFromPath,
-                    numberOfColumnsFromFile);
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath);
             rangeDesc.setHdfsParams(hdfsParams);

Review Comment:
   Already call `setHdfsParams` in `createBrokerRangeDesc()`



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -265,6 +313,11 @@ struct TBrokerScanNode {
     4: optional list<Exprs.TExpr> pre_filter_exprs
 }
 
+struct TFileScanNode {
+    1: required Types.TTupleId tuple_id

Review Comment:
   use optional



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java:
##########
@@ -57,34 +57,22 @@ public class HMSExternalDataSource extends 
ExternalDataSource {
      * Default constructor for HMSExternalDataSource.
      */
     public HMSExternalDataSource(String name, Map<String, String> props) {
-        setName(name);
-        getDsProperty().setProperties(props);
-        setType("hms");
-    }
-
-    /**
-     * Hive metastore data source implementation.
-     *
-     * @param hiveMetastoreUris e.g. thrift://127.0.0.1:9083
-     */
-    public HMSExternalDataSource(long id, String name, String type, 
DataSourceProperty dsProperty,
-            String hiveMetastoreUris) throws DdlException {
-        this.id = id;
+        this.id = nextId.incrementAndGet();

Review Comment:
   Why generate this id each time?
   You'd better pass this id from outside.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java:
##########
@@ -253,45 +264,40 @@ private void buildScanRange() throws UserException, 
IOException {
         String filePath = ((FileSplit) 
inputSplits[0]).getPath().toUri().getPath();
         String fsName = fullPath.replace(filePath, "");
         hdfsParams.setFsName(fsName);
-        List<String> partitionKeys = new ArrayList<>();
-        for (FieldSchema fieldSchema : 
hmsTable.getRemoteTable().getPartitionKeys()) {
-            partitionKeys.add(fieldSchema.getName());
-        }
+
 
         for (InputSplit split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
 
             TScanRangeLocations curLocations = newLocations(context.params);

Review Comment:
   Here you create `TScanRangeLocations` for every file?
   That would be too many `TScanRangeLocations`.



##########
fe/fe-core/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1652,7 +1652,7 @@ public class Config extends ConfigBase {
      * Should be removed when this feature is ready.
      */
     @ConfField(mutable = false, masterOnly = true)
-    public static boolean enable_multi_catalog = false; // 1 min
+    public static boolean enable_multi_catalog = true; // 1 min

Review Comment:
   set back to false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to