morningman commented on code in PR #27784:
URL: https://github.com/apache/doris/pull/27784#discussion_r1413288385


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
             columns = tmpSchema;
         }
         initPartitionColumns(columns);
+        initBucketingColumns(columns);
         return columns;
     }
 
+    private void initBucketingColumns(List<Column> columns) {
+        List<String> bucketCols = new ArrayList<>(5);
+        int numBuckets = getBucketColums(bucketCols);

Review Comment:
   ```suggestion
           int numBuckets = getBucketColumns(bucketCols);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
             columns = tmpSchema;
         }
         initPartitionColumns(columns);
+        initBucketingColumns(columns);
         return columns;
     }
 
+    private void initBucketingColumns(List<Column> columns) {
+        List<String> bucketCols = new ArrayList<>(5);
+        int numBuckets = getBucketColums(bucketCols);
+        if (bucketCols.isEmpty()) {
+            bucketColumns = ImmutableList.of();
+            distributionInfo = new RandomDistributionInfo(1, true);
+            return;
+        }
+
+        int bucketingVersion = 
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+                "2"));
+        ImmutableList.Builder<Column> bucketColBuilder = 
ImmutableList.builder();
+        for (String colName : bucketCols) {
+            // do not use "getColum()", which will cause dead loop

Review Comment:
   ```suggestion
               // do not use "getColumn()", which will cause dead loop
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java:
##########
@@ -421,4 +428,37 @@ protected TFileCompressType getFileCompressType(FileSplit 
fileSplit) throws User
         }
         return compressType;
     }
+
+    @Override
+    public DataPartition constructInputPartitionByDistributionInfo() {
+        if (hmsTable.isBucketedTable()) {
+            DistributionInfo distributionInfo = 
hmsTable.getDefaultDistributionInfo();
+            if (!(distributionInfo instanceof HashDistributionInfo)) {
+                return DataPartition.RANDOM;
+            }
+            List<Column> distributeColumns = ((HiveExternalDistributionInfo) 
distributionInfo).getDistributionColumns();
+            List<Expr> dataDistributeExprs = Lists.newArrayList();
+            for (Column column : distributeColumns) {
+                SlotRef slotRef = new SlotRef(desc.getRef().getName(), 
column.getName());
+                dataDistributeExprs.add(slotRef);
+            }
+            return DataPartition.hashPartitioned(dataDistributeExprs, 
THashType.SPARK_MURMUR32);
+        }
+
+        return DataPartition.RANDOM;
+    }
+
+    public HMSExternalTable getHiveTable() {
+        return hmsTable;
+    }
+
+    @Override
+    public THashType getHashType() {
+        if (hmsTable.isBucketedTable()

Review Comment:
   Need to check if this table is created by Spark?



##########
fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java:
##########
@@ -76,10 +83,15 @@ public DataPartition(TPartitionType type) {
         Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type 
== TPartitionType.RANDOM);
         this.type = type;
         this.partitionExprs = ImmutableList.of();
+        this.hashType = THashType.CRC32;
+    }
+
+    public static DataPartition hashPartitioned(List<Expr> exprs, THashType 
hashType) {
+        return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, 
hashType);
     }
 
     public static DataPartition hashPartitioned(List<Expr> exprs) {
-        return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
+        return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, 
THashType.XXHASH64);

Review Comment:
   Why using `THashType.XXHASH64` for this method?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
             columns = tmpSchema;
         }
         initPartitionColumns(columns);
+        initBucketingColumns(columns);
         return columns;
     }
 
+    private void initBucketingColumns(List<Column> columns) {
+        List<String> bucketCols = new ArrayList<>(5);
+        int numBuckets = getBucketColums(bucketCols);
+        if (bucketCols.isEmpty()) {
+            bucketColumns = ImmutableList.of();
+            distributionInfo = new RandomDistributionInfo(1, true);
+            return;
+        }
+
+        int bucketingVersion = 
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+                "2"));
+        ImmutableList.Builder<Column> bucketColBuilder = 
ImmutableList.builder();
+        for (String colName : bucketCols) {
+            // do not use "getColum()", which will cause dead loop
+            for (Column column : columns) {
+                if (colName.equals(column.getName())) {
+                    // For partition column, if it is string type, change it 
to varchar(65535)
+                    // to be same as doris managed table.
+                    // This is to avoid some unexpected behavior such as 
different partition pruning result
+                    // between doris managed table and external table.
+                    if (column.getType().getPrimitiveType() == 
PrimitiveType.STRING) {
+                        
column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
+                    }
+                    bucketColBuilder.add(column);
+                    break;
+                }
+            }
+        }
+
+        bucketColumns = bucketColBuilder.build();
+        distributionInfo = new HiveExternalDistributionInfo(numBuckets, 
bucketColumns, bucketingVersion);
+        LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(), 
name);
+    }
+
+    private int getBucketColums(List<String> bucketCols) {
+        StorageDescriptor descriptor = remoteTable.getSd();
+        int numBuckets = -1;
+        if (descriptor.isSetBucketCols() && 
!descriptor.getBucketCols().isEmpty()) {
+            /* Hive Bucketed Table */
+            bucketCols.addAll(descriptor.getBucketCols());
+            numBuckets = descriptor.getNumBuckets();
+        } else if (remoteTable.isSetParameters()
+                && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, 
remoteTable.getParameters().keySet())) {
+            Map<String, String> parameters = remoteTable.getParameters();
+            for (String key : SUPPORTED_BUCKET_PROPERTIES) {
+                if (parameters.containsKey(key)) {
+                    switch (key) {
+                        case SPARK_BUCKET + "0":
+                            bucketCols.add(0, parameters.get(key));
+                            break;
+                        case SPARK_BUCKET + "1":
+                            bucketCols.add(1, parameters.get(key));
+                            break;
+                        case SPARK_BUCKET + "2":
+                            bucketCols.add(2, parameters.get(key));
+                            break;
+                        case SPARK_BUCKET + "3":
+                            bucketCols.add(3, parameters.get(key));
+                            break;
+                        case SPARK_BUCKET + "4":

Review Comment:
   The spark can have at most 5 bucket columns?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
             columns = tmpSchema;
         }
         initPartitionColumns(columns);
+        initBucketingColumns(columns);
         return columns;
     }
 
+    private void initBucketingColumns(List<Column> columns) {
+        List<String> bucketCols = new ArrayList<>(5);
+        int numBuckets = getBucketColums(bucketCols);
+        if (bucketCols.isEmpty()) {
+            bucketColumns = ImmutableList.of();
+            distributionInfo = new RandomDistributionInfo(1, true);
+            return;
+        }
+
+        int bucketingVersion = 
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+                "2"));
+        ImmutableList.Builder<Column> bucketColBuilder = 
ImmutableList.builder();
+        for (String colName : bucketCols) {
+            // do not use "getColum()", which will cause dead loop
+            for (Column column : columns) {
+                if (colName.equals(column.getName())) {

Review Comment:
   ```suggestion
                   if (colName.equalsIgnoreCase(column.getName())) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -438,9 +469,85 @@ public List<Column> initSchema() {
             columns = tmpSchema;
         }
         initPartitionColumns(columns);
+        initBucketingColumns(columns);
         return columns;
     }
 
+    private void initBucketingColumns(List<Column> columns) {
+        List<String> bucketCols = new ArrayList<>(5);
+        int numBuckets = getBucketColums(bucketCols);
+        if (bucketCols.isEmpty()) {
+            bucketColumns = ImmutableList.of();
+            distributionInfo = new RandomDistributionInfo(1, true);
+            return;
+        }
+
+        int bucketingVersion = 
Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION,
+                "2"));
+        ImmutableList.Builder<Column> bucketColBuilder = 
ImmutableList.builder();
+        for (String colName : bucketCols) {
+            // do not use "getColum()", which will cause dead loop
+            for (Column column : columns) {
+                if (colName.equals(column.getName())) {
+                    // For partition column, if it is string type, change it 
to varchar(65535)

Review Comment:
   ```suggestion
                       // For partition/bucket column, if it is string type, 
change it to varchar(65535)
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java:
##########
@@ -715,14 +835,23 @@ public long getDataSize(boolean singleReplica) {
 
     @Override
     public boolean isDistributionColumn(String columnName) {
-        return 
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
-            .collect(Collectors.toSet()).contains(columnName.toLowerCase());
+        Set<String> distributeColumns = getDistributionColumnNames()
+                .stream().map(String::toLowerCase).collect(Collectors.toSet());
+        return distributeColumns.contains(columnName.toLowerCase());
     }
 
     @Override
     public Set<String> getDistributionColumnNames() {
-        return 
getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
-            .collect(Collectors.toSet());
+        Set<String> distributionColumnNames = Sets.newHashSet();
+        if (distributionInfo instanceof RandomDistributionInfo) {
+            return distributionColumnNames;
+        }
+        HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) 
distributionInfo;
+        List<Column> partitionColumns = 
hashDistributionInfo.getDistributionColumns();

Review Comment:
   ```suggestion
           List<Column> distColumns = 
hashDistributionInfo.getDistributionColumns();
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java:
##########
@@ -626,17 +639,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node, 
PlanFragment leftChildFr
                 leftRoot = leftRoot.getChild(0);
             }
             if (leftRoot instanceof OlapScanNode) {
-                return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
+                return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, 
rhsHashExprs);
+            } else if (leftRoot instanceof HiveScanNode) {
+                return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, 
rhsHashExprs, hashType);
             }
         }
 
         return false;
     }
 
+    private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode 
leftScanNode,
+                                         List<Expr> rhsJoinExprs, 
Ref<THashType> hashType) {
+        HMSExternalTable leftTable = leftScanNode.getHiveTable();
+
+        DistributionInfo leftDistribution = 
leftTable.getDefaultDistributionInfo();
+        if (leftDistribution == null || !(leftDistribution instanceof 
HiveExternalDistributionInfo)) {
+            return false;
+        }
+
+        HiveExternalDistributionInfo hiveDistributionInfo = 
(HiveExternalDistributionInfo) leftDistribution;
+
+        List<Column> leftDistributeColumns = 
hiveDistributionInfo.getDistributionColumns();
+        List<String> leftDistributeColumnNames = leftDistributeColumns.stream()
+                .map(col -> leftTable.getName() + "." + 
col.getName().toLowerCase()).collect(Collectors.toList());
+
+        List<String> leftJoinColumnNames = new ArrayList<>();
+        List<Expr> rightExprs = new ArrayList<>();
+        List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
+
+        for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
+            Expr lhsJoinExpr = eqJoinPredicate.getChild(0);
+            Expr rhsJoinExpr = eqJoinPredicate.getChild(1);
+            if (lhsJoinExpr.unwrapSlotRef() == null || 
rhsJoinExpr.unwrapSlotRef() == null) {
+                continue;
+            }
+
+            SlotRef leftSlot = 
node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef());
+            if (leftSlot.getTable() instanceof HMSExternalTable
+                    && 
leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) {
+                // table name in SlotRef is not the really name. `select * 
from test as t`
+                // table name in SlotRef is `t`, but here we need is `test`.
+                leftJoinColumnNames.add(leftSlot.getTable().getName() + "."
+                        + leftSlot.getColumnName().toLowerCase());
+                rightExprs.add(rhsJoinExpr);
+            }
+        }
+
+        //2 the join columns should contains all left table distribute columns 
to enable bucket shuffle join
+        for (int i = 0; i < leftDistributeColumnNames.size(); i++) {
+            String distributeColumnName = leftDistributeColumnNames.get(i);
+            boolean findRhsExprs = false;
+            // check the join column name is same as distribute column name and
+            // check the rhs join expr type is same as distribute column
+            for (int j = 0; j < leftJoinColumnNames.size(); j++) {
+                if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
+                    // varchar and string type don't need to check the length 
property
+                    if ((rightExprs.get(j).getType().isVarcharOrStringType()
+                            && 
leftDistributeColumns.get(i).getType().isVarcharOrStringType())
+                            || 
(rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) {
+                        rhsJoinExprs.add(rightExprs.get(j));
+                        findRhsExprs = true;
+                        break;
+                    }
+                }
+            }
+
+            if (!findRhsExprs) {
+                return false;
+            }
+        }
+
+        hashType.value = THashType.SPARK_MURMUR32;

Review Comment:
   Need to check if this table is created by Spark?



##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -2216,8 +2218,13 @@ private void computeScanRangeAssignment() throws 
Exception {
                 computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, 
assignedBytesPerHost, replicaNumPerHost);
             }
             if (fragmentContainsBucketShuffleJoin) {
-                
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) 
scanNode,
-                        idToBackend, addressToBackendID, replicaNumPerHost);
+                if (scanNode instanceof OlapScanNode) {
+                    
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) 
scanNode,
+                            idToBackend, addressToBackendID, 
replicaNumPerHost);
+                } else {

Review Comment:
   else if (scanNode instanceof HiveScanNode)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to