morningman commented on code in PR #27784: URL: https://github.com/apache/doris/pull/27784#discussion_r1413288385
########## fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java: ########## @@ -438,9 +469,85 @@ public List<Column> initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List<Column> columns) { + List<String> bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColums(bucketCols); Review Comment: ```suggestion int numBuckets = getBucketColumns(bucketCols); ``` ########## fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java: ########## @@ -438,9 +469,85 @@ public List<Column> initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List<Column> columns) { + List<String> bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColums(bucketCols); + if (bucketCols.isEmpty()) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, + "2")); + ImmutableList.Builder<Column> bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColum()", which will cause dead loop Review Comment: ```suggestion // do not use "getColumn()", which will cause dead loop ``` ########## fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java: ########## @@ -421,4 +428,37 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User } return compressType; } + + @Override + public DataPartition constructInputPartitionByDistributionInfo() { + if (hmsTable.isBucketedTable()) { + DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return DataPartition.RANDOM; + } + List<Column> distributeColumns = ((HiveExternalDistributionInfo) distributionInfo).getDistributionColumns(); + List<Expr> dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.SPARK_MURMUR32); + } + + return DataPartition.RANDOM; + } + + public HMSExternalTable getHiveTable() { + return hmsTable; + } + + @Override + public THashType getHashType() { + if (hmsTable.isBucketedTable() Review Comment: Need to check if this table is created by Spark? ########## fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java: ########## @@ -76,10 +83,15 @@ public DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM); this.type = type; this.partitionExprs = ImmutableList.of(); + this.hashType = THashType.CRC32; + } + + public static DataPartition hashPartitioned(List<Expr> exprs, THashType hashType) { + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, hashType); } public static DataPartition hashPartitioned(List<Expr> exprs) { - return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, THashType.XXHASH64); Review Comment: Why using `THashType.XXHASH64` for this method? ########## fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java: ########## @@ -438,9 +469,85 @@ public List<Column> initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List<Column> columns) { + List<String> bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColums(bucketCols); + if (bucketCols.isEmpty()) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, + "2")); + ImmutableList.Builder<Column> bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColum()", which will cause dead loop + for (Column column : columns) { + if (colName.equals(column.getName())) { + // For partition column, if it is string type, change it to varchar(65535) + // to be same as doris managed table. + // This is to avoid some unexpected behavior such as different partition pruning result + // between doris managed table and external table. + if (column.getType().getPrimitiveType() == PrimitiveType.STRING) { + column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH)); + } + bucketColBuilder.add(column); + break; + } + } + } + + bucketColumns = bucketColBuilder.build(); + distributionInfo = new HiveExternalDistributionInfo(numBuckets, bucketColumns, bucketingVersion); + LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(), name); + } + + private int getBucketColums(List<String> bucketCols) { + StorageDescriptor descriptor = remoteTable.getSd(); + int numBuckets = -1; + if (descriptor.isSetBucketCols() && !descriptor.getBucketCols().isEmpty()) { + /* Hive Bucketed Table */ + bucketCols.addAll(descriptor.getBucketCols()); + numBuckets = descriptor.getNumBuckets(); + } else if (remoteTable.isSetParameters() + && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { + Map<String, String> parameters = remoteTable.getParameters(); + for (String key : SUPPORTED_BUCKET_PROPERTIES) { + if (parameters.containsKey(key)) { + switch (key) { + case SPARK_BUCKET + "0": + bucketCols.add(0, parameters.get(key)); + break; + case SPARK_BUCKET + "1": + bucketCols.add(1, parameters.get(key)); + break; + case SPARK_BUCKET + "2": + bucketCols.add(2, parameters.get(key)); + break; + case SPARK_BUCKET + "3": + bucketCols.add(3, parameters.get(key)); + break; + case SPARK_BUCKET + "4": Review Comment: The spark can have at most 5 bucket columns? ########## fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java: ########## @@ -438,9 +469,85 @@ public List<Column> initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List<Column> columns) { + List<String> bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColums(bucketCols); + if (bucketCols.isEmpty()) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, + "2")); + ImmutableList.Builder<Column> bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColum()", which will cause dead loop + for (Column column : columns) { + if (colName.equals(column.getName())) { Review Comment: ```suggestion if (colName.equalsIgnoreCase(column.getName())) { ``` ########## fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java: ########## @@ -438,9 +469,85 @@ public List<Column> initSchema() { columns = tmpSchema; } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List<Column> columns) { + List<String> bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColums(bucketCols); + if (bucketCols.isEmpty()) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, + "2")); + ImmutableList.Builder<Column> bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColum()", which will cause dead loop + for (Column column : columns) { + if (colName.equals(column.getName())) { + // For partition column, if it is string type, change it to varchar(65535) Review Comment: ```suggestion // For partition/bucket column, if it is string type, change it to varchar(65535) ``` ########## fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java: ########## @@ -715,14 +835,23 @@ public long getDataSize(boolean singleReplica) { @Override public boolean isDistributionColumn(String columnName) { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()).contains(columnName.toLowerCase()); + Set<String> distributeColumns = getDistributionColumnNames() + .stream().map(String::toLowerCase).collect(Collectors.toSet()); + return distributeColumns.contains(columnName.toLowerCase()); } @Override public Set<String> getDistributionColumnNames() { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()); + Set<String> distributionColumnNames = Sets.newHashSet(); + if (distributionInfo instanceof RandomDistributionInfo) { + return distributionColumnNames; + } + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List<Column> partitionColumns = hashDistributionInfo.getDistributionColumns(); Review Comment: ```suggestion List<Column> distColumns = hashDistributionInfo.getDistributionColumns(); ``` ########## fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java: ########## @@ -626,17 +639,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr leftRoot = leftRoot.getChild(0); } if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + List<Expr> rhsJoinExprs, Ref<THashType> hashType) { + HMSExternalTable leftTable = leftScanNode.getHiveTable(); + + DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); + if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo)) { + return false; + } + + HiveExternalDistributionInfo hiveDistributionInfo = (HiveExternalDistributionInfo) leftDistribution; + + List<Column> leftDistributeColumns = hiveDistributionInfo.getDistributionColumns(); + List<String> leftDistributeColumnNames = leftDistributeColumns.stream() + .map(col -> leftTable.getName() + "." + col.getName().toLowerCase()).collect(Collectors.toList()); + + List<String> leftJoinColumnNames = new ArrayList<>(); + List<Expr> rightExprs = new ArrayList<>(); + List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts(); + + for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { + Expr lhsJoinExpr = eqJoinPredicate.getChild(0); + Expr rhsJoinExpr = eqJoinPredicate.getChild(1); + if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef()); + if (leftSlot.getTable() instanceof HMSExternalTable + && leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) { + // table name in SlotRef is not the really name. `select * from test as t` + // table name in SlotRef is `t`, but here we need is `test`. + leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + + leftSlot.getColumnName().toLowerCase()); + rightExprs.add(rhsJoinExpr); + } + } + + //2 the join columns should contains all left table distribute columns to enable bucket shuffle join + for (int i = 0; i < leftDistributeColumnNames.size(); i++) { + String distributeColumnName = leftDistributeColumnNames.get(i); + boolean findRhsExprs = false; + // check the join column name is same as distribute column name and + // check the rhs join expr type is same as distribute column + for (int j = 0; j < leftJoinColumnNames.size(); j++) { + if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { + // varchar and string type don't need to check the length property + if ((rightExprs.get(j).getType().isVarcharOrStringType() + && leftDistributeColumns.get(i).getType().isVarcharOrStringType()) + || (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) { + rhsJoinExprs.add(rightExprs.get(j)); + findRhsExprs = true; + break; + } + } + } + + if (!findRhsExprs) { + return false; + } + } + + hashType.value = THashType.SPARK_MURMUR32; Review Comment: Need to check if this table is created by Spark? ########## fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java: ########## @@ -2216,8 +2218,13 @@ private void computeScanRangeAssignment() throws Exception { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); + if (scanNode instanceof OlapScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } else { Review Comment: else if (scanNode instanceof HiveScanNode) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org