This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 719af36f138 [Cherry-Pick][Fix](lakesoul) Fix CVEs and some filter pushdown issues (#39044) (#40812) 719af36f138 is described below commit 719af36f1384892ae25de362c83ffac162569e60 Author: Ceng <441651...@qq.com> AuthorDate: Mon Sep 23 14:09:36 2024 +0800 [Cherry-Pick][Fix](lakesoul) Fix CVEs and some filter pushdown issues (#39044) (#40812) bp #39044 Signed-off-by: dmetasoul01 <opensou...@dmetasoul.com> Co-authored-by: Xu Chen <xuchen-p...@users.noreply.github.com> Co-authored-by: dmetasoul01 <opensou...@dmetasoul.com> --- fe/be-java-extensions/lakesoul-scanner/pom.xml | 74 +-- .../apache/doris/lakesoul/LakeSoulJniScanner.java | 61 ++- .../org/apache/doris/lakesoul/LakeSoulUtils.java | 30 +- .../apache/doris/lakesoul/arrow/ArrowUtils.java | 24 +- .../lakesoul/arrow/LakeSoulArrowJniScanner.java | 58 ++- .../doris/lakesoul/parquet/ParquetFilter.java | 288 ----------- fe/fe-core/pom.xml | 69 ++- .../lakesoul/LakeSoulExternalCatalog.java | 48 +- .../datasource/lakesoul/LakeSoulExternalTable.java | 26 +- .../doris/datasource/lakesoul/LakeSoulUtils.java | 526 +++++++++++++++++++++ .../lakesoul/source/LakeSoulScanNode.java | 170 ++++++- .../datasource/lakesoul/LakeSoulPredicateTest.java | 280 +++++++++++ fe/pom.xml | 4 +- .../lakesoul/test_lakesoul_filter.groovy | 74 +++ .../lakesoul/test_external_table_lakesoul.groovy | 24 +- 15 files changed, 1265 insertions(+), 491 deletions(-) diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml b/fe/be-java-extensions/lakesoul-scanner/pom.xml index cbbb473483e..4a1ae6b2e8a 100644 --- a/fe/be-java-extensions/lakesoul-scanner/pom.xml +++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml @@ -47,87 +47,19 @@ under the License. </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.arrow</groupId> - <artifactId>arrow-vector</artifactId> - <version>${arrow.version}</version> - </dependency> - <dependency> - <groupId>org.apache.arrow</groupId> - <artifactId>arrow-memory-unsafe</artifactId> - <version>${arrow.version}</version> - </dependency> - <dependency> - <groupId>org.apache.arrow</groupId> - <artifactId>arrow-c-data</artifactId> - <version>${arrow.version}</version> - </dependency> - - <!-- scala deps --> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - <version>${scala.version}</version> - </dependency> <dependency> <groupId>com.dmetasoul</groupId> <artifactId>lakesoul-io-java</artifactId> - <version>2.5.4</version> + <version>${lakesoul.version}</version> + <classifier>shaded</classifier> <exclusions> <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.arrow</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.antlr</groupId> - <artifactId>antlr4-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> + <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> - - <dependency> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-column</artifactId> - <version>1.12.2</version> - </dependency> - </dependencies> <build> diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java index 3dfbff756db..bedef57d3b7 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java @@ -17,25 +17,30 @@ package org.apache.doris.lakesoul; -import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner; -import org.apache.doris.lakesoul.parquet.ParquetFilter; import com.dmetasoul.lakesoul.LakeSoulArrowReader; import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.lakesoul.shaded.com.fasterxml.jackson.core.type.TypeReference; +import com.lakesoul.shaded.com.fasterxml.jackson.databind.ObjectMapper; +import com.lakesoul.shaded.io.substrait.proto.Plan; +import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; +import com.lakesoul.shaded.org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class LakeSoulJniScanner extends LakeSoulArrowJniScanner { + private static final Logger LOG = LoggerFactory.getLogger(LakeSoulJniScanner.class); private final Map<String, String> params; @@ -60,6 +65,8 @@ public class LakeSoulJniScanner extends LakeSoulArrowJniScanner { withAllocator(nativeIOReader.getAllocator()); nativeIOReader.setBatchSize(batchSize); + LOG.info("opening LakeSoulJniScanner with params={}", params); + // add files for (String file : params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) { nativeIOReader.addFile(file); @@ -72,19 +79,43 @@ public class LakeSoulJniScanner extends LakeSoulArrowJniScanner { Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList())); } - Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON)); + String options = params.getOrDefault(LakeSoulUtils.OPTIONS, "{}"); + Map<String, String> optionsMap = new ObjectMapper().readValue( + options, new TypeReference<Map<String, String>>() {} + ); + String base64Predicate = optionsMap.get(LakeSoulUtils.SUBSTRAIT_PREDICATE); + if (base64Predicate != null) { + Plan predicate = SubstraitUtil.decodeBase64String(base64Predicate); + if (LOG.isDebugEnabled()) { + LOG.debug("push predicate={}", predicate); + } + nativeIOReader.addFilterProto(predicate); + } + + for (String key : LakeSoulUtils.OBJECT_STORE_OPTIONS) { + String value = optionsMap.get(key); + if (key != null) { + nativeIOReader.setObjectStoreOption(key, value); + } + } + + Schema tableSchema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON)); String[] requiredFieldNames = params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM); List<Field> requiredFields = new ArrayList<>(); for (String fieldName : requiredFieldNames) { - requiredFields.add(schema.findField(fieldName)); + String name = fieldName; + if (StringUtils.isEmpty(name)) { + continue; + } + requiredFields.add(tableSchema.findField(fieldName)); } requiredSchema = new Schema(requiredFields); nativeIOReader.setSchema(requiredSchema); - HashSet<String> partitionColumn = new HashSet<>(); + List<Field> partitionFields = new ArrayList<>(); for (String partitionKV : params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "") .split(LakeSoulUtils.LIST_DELIM)) { if (partitionKV.isEmpty()) { @@ -94,17 +125,15 @@ public class LakeSoulJniScanner extends LakeSoulArrowJniScanner { if (kv.length != 2) { throw new IllegalArgumentException("Invalid partition column = " + partitionKV); } - partitionColumn.add(kv[0]); + nativeIOReader.setDefaultColumnValue(kv[0], kv[1]); + partitionFields.add(tableSchema.findField(kv[0])); + } + if (!partitionFields.isEmpty()) { + nativeIOReader.setPartitionSchema(new Schema(partitionFields)); } initTableInfo(params); - for (ScanPredicate predicate : predicates) { - if (!partitionColumn.contains(predicate.columName)) { - nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString()); - } - } - nativeIOReader.initializeReader(); lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, awaitTimeout); } diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java index 6c7f88f3ab3..ca07a81d0da 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java @@ -17,13 +17,29 @@ package org.apache.doris.lakesoul; +import java.util.Arrays; +import java.util.List; + public class LakeSoulUtils { - public static String FILE_NAMES = "file_paths"; - public static String PRIMARY_KEYS = "primary_keys"; - public static String SCHEMA_JSON = "table_schema"; - public static String PARTITION_DESC = "partition_descs"; - public static String REQUIRED_FIELDS = "required_fields"; + public static final String FILE_NAMES = "file_paths"; + public static final String PRIMARY_KEYS = "primary_keys"; + public static final String SCHEMA_JSON = "table_schema"; + public static final String PARTITION_DESC = "partition_descs"; + public static final String REQUIRED_FIELDS = "required_fields"; + public static final String OPTIONS = "options"; + public static final String SUBSTRAIT_PREDICATE = "substrait_predicate"; + public static final String LIST_DELIM = ";"; + public static final String PARTITIONS_KV_DELIM = "="; + + public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + public static final String FS_S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; - public static String LIST_DELIM = ";"; - public static String PARTITIONS_KV_DELIM = "="; + public static final List<String> OBJECT_STORE_OPTIONS = Arrays.asList( + FS_S3A_ACCESS_KEY, + FS_S3A_SECRET_KEY, + FS_S3A_ENDPOINT, + FS_S3A_PATH_STYLE_ACCESS + ); } diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java index 3ad28ba783a..a3f9bc4788d 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java @@ -20,10 +20,10 @@ package org.apache.doris.lakesoul.arrow; import org.apache.doris.common.jni.utils.OffHeap; import org.apache.doris.common.jni.utils.TypeNativeBytes; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.util.Preconditions; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf; +import com.lakesoul.shaded.org.apache.arrow.util.Preconditions; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.ArrowType; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; import java.time.LocalDate; import java.time.LocalDateTime; @@ -40,7 +40,7 @@ public class ArrowUtils { LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, 0, ZoneOffset.UTC); OffHeap.putLong(null, address + offset, TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), - v.getMinute(), v.getSecond(), v.getNano() / 1000)); + v.getMinute(), v.getSecond(), v.getNano() / 1000)); offset += 8; } @@ -58,7 +58,7 @@ public class ArrowUtils { LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) nanoSec, ZoneOffset.UTC); OffHeap.putLong(null, address + offset, TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), - v.getMinute(), v.getSecond(), v.getNano() / 1000)); + v.getMinute(), v.getSecond(), v.getNano() / 1000)); offset += 8; } @@ -76,7 +76,7 @@ public class ArrowUtils { LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) nanoSec, ZoneOffset.UTC); OffHeap.putLong(null, address + offset, TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), - v.getMinute(), v.getSecond(), v.getNano() / 1000)); + v.getMinute(), v.getSecond(), v.getNano() / 1000)); offset += 8; } @@ -94,7 +94,7 @@ public class ArrowUtils { LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) nanoSec, ZoneOffset.UTC); OffHeap.putLong(null, address + offset, TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), - v.getMinute(), v.getSecond(), v.getNano() / 1000)); + v.getMinute(), v.getSecond(), v.getNano() / 1000)); offset += 8; } @@ -215,11 +215,9 @@ public class ArrowUtils { return hiveType.toString(); } - private static class ArrowTypeToHiveTypeConverter - implements ArrowType.ArrowTypeVisitor<String> { + private static class ArrowTypeToHiveTypeConverter implements ArrowType.ArrowTypeVisitor<String> { - private static final ArrowTypeToHiveTypeConverter INSTANCE = - new ArrowTypeToHiveTypeConverter(); + private static final ArrowTypeToHiveTypeConverter INSTANCE = new ArrowTypeToHiveTypeConverter(); @Override public String visit(ArrowType.Null type) { @@ -359,6 +357,4 @@ public class ArrowUtils { return "unsupported"; } } - - } diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java index 320d653a20a..b8fb8e92bd4 100644 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java @@ -23,28 +23,28 @@ import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; import org.apache.doris.common.jni.vec.VectorTable; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.BitVector; -import org.apache.arrow.vector.DateDayVector; -import org.apache.arrow.vector.DecimalVector; -import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.TimeStampMicroTZVector; -import org.apache.arrow.vector.TimeStampMicroVector; -import org.apache.arrow.vector.TimeStampMilliTZVector; -import org.apache.arrow.vector.TimeStampMilliVector; -import org.apache.arrow.vector.TimeStampNanoTZVector; -import org.apache.arrow.vector.TimeStampNanoVector; -import org.apache.arrow.vector.TimeStampSecTZVector; -import org.apache.arrow.vector.TimeStampSecVector; -import org.apache.arrow.vector.TimeStampVector; -import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.complex.ListVector; -import org.apache.arrow.vector.complex.StructVector; -import org.apache.arrow.vector.types.pojo.Field; -import org.apache.arrow.vector.types.pojo.Schema; +import com.lakesoul.shaded.org.apache.arrow.memory.ArrowBuf; +import com.lakesoul.shaded.org.apache.arrow.memory.BufferAllocator; +import com.lakesoul.shaded.org.apache.arrow.vector.BitVector; +import com.lakesoul.shaded.org.apache.arrow.vector.DateDayVector; +import com.lakesoul.shaded.org.apache.arrow.vector.DecimalVector; +import com.lakesoul.shaded.org.apache.arrow.vector.FieldVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMicroVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampMilliVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampNanoVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecTZVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampSecVector; +import com.lakesoul.shaded.org.apache.arrow.vector.TimeStampVector; +import com.lakesoul.shaded.org.apache.arrow.vector.ValueVector; +import com.lakesoul.shaded.org.apache.arrow.vector.VarCharVector; +import com.lakesoul.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import com.lakesoul.shaded.org.apache.arrow.vector.complex.ListVector; +import com.lakesoul.shaded.org.apache.arrow.vector.complex.StructVector; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Field; +import com.lakesoul.shaded.org.apache.arrow.vector.types.pojo.Schema; import org.apache.log4j.Logger; import java.io.IOException; @@ -101,7 +101,7 @@ public class LakeSoulArrowJniScanner extends JniScanner { String[] requiredFields = new String[fields.size()]; for (int i = 0; i < fields.size(); i++) { columnTypes[i] = - ColumnType.parseType(fields.get(i).getName(), ArrowUtils.hiveTypeFromArrowField(fields.get(i))); + ColumnType.parseType(fields.get(i).getName(), ArrowUtils.hiveTypeFromArrowField(fields.get(i))); requiredFields[i] = fields.get(i).getName(); } predicates = new ScanPredicate[0]; @@ -116,8 +116,8 @@ public class LakeSoulArrowJniScanner extends JniScanner { String[] requiredFields = new String[fields.size()]; for (int i = 0; i < fields.size(); i++) { columnTypes[i] = - ColumnType.parseType(fields.get(i).getName(), - ArrowUtils.hiveTypeFromArrowField(fields.get(i))); + ColumnType.parseType(fields.get(i).getName(), + ArrowUtils.hiveTypeFromArrowField(fields.get(i))); requiredFields[i] = fields.get(i).getName(); } @@ -142,10 +142,8 @@ public class LakeSoulArrowJniScanner extends JniScanner { private Integer fillMetaAddressVector(int batchSize, ColumnType columnType, long metaAddress, Integer offset, ValueVector valueVector) { // nullMap - long - validityBuffer = - ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), batchSize, - valueVector.getField().isNullable()); + long validityBuffer = ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), batchSize, + valueVector.getField().isNullable()); extraOffHeap.add(validityBuffer); OffHeap.putLong(null, metaAddress + (offset++) * 8, validityBuffer); @@ -172,7 +170,7 @@ public class LakeSoulArrowJniScanner extends JniScanner { continue; } offset = fillMetaAddressVector(batchSize, columnType.getChildTypes().get(i), metaAddress, offset, - childrenVector); + childrenVector); } } else if (columnType.isStringType()) { diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java deleted file mode 100644 index 7d2820acd79..00000000000 --- a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java +++ /dev/null @@ -1,288 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.lakesoul.parquet; - -import org.apache.doris.common.jni.vec.ColumnType; -import org.apache.doris.common.jni.vec.ScanPredicate; - -import org.apache.parquet.filter2.predicate.FilterApi; -import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.io.api.Binary; - -public class ParquetFilter { - - public static FilterPredicate toParquetFilter(ScanPredicate predicate) { - ScanPredicate.FilterOp filterOp = predicate.op; - switch (filterOp) { - case FILTER_IN: - return convertIn(predicate); - case FILTER_NOT_IN: - return convertNotIn(predicate); - case FILTER_LESS: - return convertLess(predicate); - case FILTER_LARGER: - return convertLarger(predicate); - case FILTER_LESS_OR_EQUAL: - return convertLessOrEqual(predicate); - case FILTER_LARGER_OR_EQUAL: - return convertLargerOrEqual(predicate); - default: - break; - } - throw new RuntimeException("Unsupported ScanPredicate" + ScanPredicate.dump(new ScanPredicate[] {predicate})); - } - - private static FilterPredicate convertNotIn(ScanPredicate predicate) { - String colName = predicate.columName; - ColumnType.Type colType = predicate.type; - ScanPredicate.PredicateValue[] predicateValues = predicate.predicateValues(); - FilterPredicate resultPredicate = null; - for (ScanPredicate.PredicateValue predicateValue : predicateValues) { - if (resultPredicate == null) { - resultPredicate = makeNotEquals(colName, colType, predicateValue); - } else { - resultPredicate = FilterApi.and(resultPredicate, makeNotEquals(colName, colType, predicateValue)); - } - } - return resultPredicate; - } - - private static FilterPredicate convertIn(ScanPredicate predicate) { - String colName = predicate.columName; - ColumnType.Type colType = predicate.type; - ScanPredicate.PredicateValue[] predicateValues = predicate.predicateValues(); - FilterPredicate resultPredicate = null; - for (ScanPredicate.PredicateValue predicateValue : predicateValues) { - if (resultPredicate == null) { - resultPredicate = makeEquals(colName, colType, predicateValue); - } else { - resultPredicate = FilterApi.or(resultPredicate, makeEquals(colName, colType, predicateValue)); - } - } - return resultPredicate; - } - - private static FilterPredicate convertLarger(ScanPredicate predicate) { - String colName = predicate.columName; - ColumnType.Type colType = predicate.type; - ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; - return makeLarger(colName, colType, predicateValue); - } - - private static FilterPredicate convertLargerOrEqual(ScanPredicate predicate) { - String colName = predicate.columName; - ColumnType.Type colType = predicate.type; - ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; - return makeLargerOrEqual(colName, colType, predicateValue); - } - - private static FilterPredicate convertLess(ScanPredicate predicate) { - String colName = predicate.columName; - ColumnType.Type colType = predicate.type; - ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; - return makeLess(colName, colType, predicateValue); - } - - private static FilterPredicate convertLessOrEqual(ScanPredicate predicate) { - String colName = predicate.columName; - ColumnType.Type colType = predicate.type; - ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; - return makeLessOrEqual(colName, colType, predicateValue); - } - - private static FilterPredicate makeNotEquals(String colName, ColumnType.Type type, - ScanPredicate.PredicateValue value) { - switch (type) { - case BOOLEAN: - return FilterApi.notEq(FilterApi.booleanColumn(colName), value.getBoolean()); - case TINYINT: - return FilterApi.notEq(FilterApi.intColumn(colName), (int) value.getByte()); - case SMALLINT: - return FilterApi.notEq(FilterApi.intColumn(colName), (int) value.getShort()); - case INT: - return FilterApi.notEq(FilterApi.intColumn(colName), value.getInt()); - case BIGINT: - return FilterApi.notEq(FilterApi.longColumn(colName), value.getLong()); - case FLOAT: - return FilterApi.notEq(FilterApi.floatColumn(colName), value.getFloat()); - case DOUBLE: - return FilterApi.notEq(FilterApi.doubleColumn(colName), value.getDouble()); - case CHAR: - case VARCHAR: - case STRING: - return FilterApi.notEq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); - case BINARY: - return FilterApi.notEq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); - case ARRAY: - case MAP: - case STRUCT: - default: - throw new RuntimeException("Unsupported push_down_filter type value: " + type); - } - } - - - private static FilterPredicate makeEquals(String colName, ColumnType.Type type, - ScanPredicate.PredicateValue value) { - switch (type) { - case BOOLEAN: - return FilterApi.eq(FilterApi.booleanColumn(colName), value.getBoolean()); - case TINYINT: - return FilterApi.eq(FilterApi.intColumn(colName), (int) value.getByte()); - case SMALLINT: - return FilterApi.eq(FilterApi.intColumn(colName), (int) value.getShort()); - case INT: - return FilterApi.eq(FilterApi.intColumn(colName), value.getInt()); - case BIGINT: - return FilterApi.eq(FilterApi.longColumn(colName), value.getLong()); - case FLOAT: - return FilterApi.eq(FilterApi.floatColumn(colName), value.getFloat()); - case DOUBLE: - return FilterApi.eq(FilterApi.doubleColumn(colName), value.getDouble()); - case CHAR: - case VARCHAR: - case STRING: - return FilterApi.eq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); - case BINARY: - return FilterApi.eq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); - case ARRAY: - case MAP: - case STRUCT: - default: - throw new RuntimeException("Unsupported push_down_filter type value: " + type); - } - } - - private static FilterPredicate makeLarger(String colName, ColumnType.Type type, - ScanPredicate.PredicateValue value) { - switch (type) { - case TINYINT: - return FilterApi.gt(FilterApi.intColumn(colName), (int) value.getByte()); - case SMALLINT: - return FilterApi.gt(FilterApi.intColumn(colName), (int) value.getShort()); - case INT: - return FilterApi.gt(FilterApi.intColumn(colName), value.getInt()); - case BIGINT: - return FilterApi.gt(FilterApi.longColumn(colName), value.getLong()); - case FLOAT: - return FilterApi.gt(FilterApi.floatColumn(colName), value.getFloat()); - case DOUBLE: - return FilterApi.gt(FilterApi.doubleColumn(colName), value.getDouble()); - case CHAR: - case VARCHAR: - case STRING: - return FilterApi.gt(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); - case BINARY: - return FilterApi.gt(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); - case ARRAY: - case MAP: - case STRUCT: - default: - throw new RuntimeException("Unsupported push_down_filter type value: " + type); - } - - } - - private static FilterPredicate makeLargerOrEqual(String colName, ColumnType.Type type, - ScanPredicate.PredicateValue value) { - switch (type) { - case TINYINT: - return FilterApi.gtEq(FilterApi.intColumn(colName), (int) value.getByte()); - case SMALLINT: - return FilterApi.gtEq(FilterApi.intColumn(colName), (int) value.getShort()); - case INT: - return FilterApi.gtEq(FilterApi.intColumn(colName), value.getInt()); - case BIGINT: - return FilterApi.gtEq(FilterApi.longColumn(colName), value.getLong()); - case FLOAT: - return FilterApi.gtEq(FilterApi.floatColumn(colName), value.getFloat()); - case DOUBLE: - return FilterApi.gtEq(FilterApi.doubleColumn(colName), value.getDouble()); - case CHAR: - case VARCHAR: - case STRING: - return FilterApi.gtEq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); - case BINARY: - return FilterApi.gtEq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); - case ARRAY: - case MAP: - case STRUCT: - default: - throw new RuntimeException("Unsupported push_down_filter type value: " + type); - } - - } - - private static FilterPredicate makeLess(String colName, ColumnType.Type type, ScanPredicate.PredicateValue value) { - switch (type) { - case TINYINT: - return FilterApi.lt(FilterApi.intColumn(colName), (int) value.getByte()); - case SMALLINT: - return FilterApi.lt(FilterApi.intColumn(colName), (int) value.getShort()); - case INT: - return FilterApi.lt(FilterApi.intColumn(colName), value.getInt()); - case BIGINT: - return FilterApi.lt(FilterApi.longColumn(colName), value.getLong()); - case FLOAT: - return FilterApi.lt(FilterApi.floatColumn(colName), value.getFloat()); - case DOUBLE: - return FilterApi.lt(FilterApi.doubleColumn(colName), value.getDouble()); - case CHAR: - case VARCHAR: - case STRING: - return FilterApi.lt(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); - case BINARY: - return FilterApi.lt(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); - case ARRAY: - case MAP: - case STRUCT: - default: - throw new RuntimeException("Unsupported push_down_filter type value: " + type); - } - - } - - private static FilterPredicate makeLessOrEqual(String colName, ColumnType.Type type, - ScanPredicate.PredicateValue value) { - switch (type) { - case TINYINT: - return FilterApi.ltEq(FilterApi.intColumn(colName), (int) value.getByte()); - case SMALLINT: - return FilterApi.ltEq(FilterApi.intColumn(colName), (int) value.getShort()); - case INT: - return FilterApi.ltEq(FilterApi.intColumn(colName), value.getInt()); - case BIGINT: - return FilterApi.ltEq(FilterApi.longColumn(colName), value.getLong()); - case FLOAT: - return FilterApi.ltEq(FilterApi.floatColumn(colName), value.getFloat()); - case DOUBLE: - return FilterApi.ltEq(FilterApi.doubleColumn(colName), value.getDouble()); - case CHAR: - case VARCHAR: - case STRING: - return FilterApi.ltEq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); - case BINARY: - return FilterApi.ltEq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); - case ARRAY: - case MAP: - case STRUCT: - default: - throw new RuntimeException("Unsupported push_down_filter type value: " + type); - } - } -} diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index bac2346185d..92e576fb2e1 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -565,18 +565,77 @@ under the License. <artifactId>hadoop-auth</artifactId> </dependency> + <!-- lakesoul --> <dependency> <groupId>com.dmetasoul</groupId> - <artifactId>lakesoul-common</artifactId> - <version>2.5.4</version> - <classifier>shaded</classifier> + <artifactId>lakesoul-io-java</artifactId> + <version>${lakesoul.version}</version> <exclusions> <exclusion> - <groupId>*</groupId> + <groupId>io.netty</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-format</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> <artifactId>*</artifactId> </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.scala-lang</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.json4s</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>42.7.3</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.apache.iceberg</groupId> @@ -1220,4 +1279,4 @@ under the License. </extension> </extensions> </build> -</project> \ No newline at end of file +</project> diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java index dd8342ad660..b27b4c6fc0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java @@ -25,15 +25,19 @@ import org.apache.doris.datasource.property.PropertyConverter; import com.dmetasoul.lakesoul.meta.DBManager; import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; -import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; public class LakeSoulExternalCatalog extends ExternalCatalog { - private DBManager dbManager; + private static final Logger LOG = LogManager.getLogger(LakeSoulExternalCatalog.class); + + private DBManager lakesoulMetadataManager; private final Map<String, String> props; @@ -48,49 +52,47 @@ public class LakeSoulExternalCatalog extends ExternalCatalog { @Override protected List<String> listDatabaseNames() { initLocalObjectsImpl(); - return dbManager.listNamespaces(); + return lakesoulMetadataManager.listNamespaces(); } @Override public List<String> listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName); - List<String> tableNames = Lists.newArrayList(); - for (TableInfo item : tifs) { - tableNames.add(item.getTableName()); - } + List<String> tableNames = lakesoulMetadataManager.listTableNamesByNamespace(dbName); return tableNames; } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName, tblName); - + TableInfo tableInfo = lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, dbName); return null != tableInfo; } @Override protected void initLocalObjectsImpl() { - if (dbManager == null) { - if (props != null) { - if (props.containsKey(DBUtil.urlKey)) { - System.setProperty(DBUtil.urlKey, props.get(DBUtil.urlKey)); - } - if (props.containsKey(DBUtil.usernameKey)) { - System.setProperty(DBUtil.usernameKey, props.get(DBUtil.usernameKey)); - } - if (props.containsKey(DBUtil.passwordKey)) { - System.setProperty(DBUtil.passwordKey, props.get(DBUtil.passwordKey)); - } + if (props != null) { + if (props.containsKey(DBUtil.urlKey)) { + System.setProperty(DBUtil.urlKey, props.get(DBUtil.urlKey)); + } + if (props.containsKey(DBUtil.usernameKey)) { + System.setProperty(DBUtil.usernameKey, props.get(DBUtil.usernameKey)); + } + if (props.containsKey(DBUtil.passwordKey)) { + System.setProperty(DBUtil.passwordKey, props.get(DBUtil.passwordKey)); } - dbManager = new DBManager(); } + lakesoulMetadataManager = new DBManager(); } public TableInfo getLakeSoulTable(String dbName, String tblName) { makeSureInitialized(); - return dbManager.getTableInfoByNameAndNamespace(tblName, dbName); + return lakesoulMetadataManager.getTableInfoByNameAndNamespace(tblName, dbName); + } + + public List<PartitionInfo> listPartitionInfo(String tableId) { + makeSureInitialized(); + return lakesoulMetadataManager.getAllPartitionInfo(tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java index 46e8d1db47c..9dd2f4811e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java @@ -25,17 +25,23 @@ import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.ExternalAnalysisTask; import org.apache.doris.thrift.TLakeSoulTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; import com.google.common.collect.Lists; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.HashMap; @@ -45,11 +51,24 @@ import java.util.Optional; import java.util.stream.Collectors; public class LakeSoulExternalTable extends ExternalTable { - + private static final Logger LOG = LogManager.getLogger(LakeSoulExternalTable.class); public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6; + public final String tableId; + public LakeSoulExternalTable(long id, String name, String dbName, LakeSoulExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE); + TableInfo tableInfo = getLakeSoulTableInfo(); + if (tableInfo == null) { + throw new RuntimeException(String.format("LakeSoul table %s.%s does not exist", dbName, name)); + } + tableId = tableInfo.getTableId(); + } + + @Override + public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { + makeSureInitialized(); + return new ExternalAnalysisTask(info); } private Type arrowFiledToDorisType(Field field) { @@ -150,6 +169,7 @@ public class LakeSoulExternalTable extends ExternalTable { String tableSchema = tableInfo.getTableSchema(); DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); Schema schema; + LOG.info("tableSchema={}", tableSchema); try { schema = Schema.fromJSON(tableSchema); } catch (IOException e) { @@ -174,6 +194,10 @@ public class LakeSoulExternalTable extends ExternalTable { return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name); } + public List<PartitionInfo> listPartitionInfo() { + return ((LakeSoulExternalCatalog) catalog).listPartitionInfo(tableId); + } + public String tablePath() { return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name).getTablePath(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java new file mode 100644 index 00000000000..fba74d4f978 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulUtils.java @@ -0,0 +1,526 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul; + +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.DecimalLiteral; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.IsNullPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.Subquery; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ColumnRange; +import org.apache.doris.thrift.TExprOpcode; + +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import io.substrait.expression.Expression; +import io.substrait.extension.DefaultExtensionCatalog; +import io.substrait.type.Type; +import io.substrait.type.TypeCreator; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class LakeSoulUtils { + + public static final String FILE_NAMES = "file_paths"; + public static final String PRIMARY_KEYS = "primary_keys"; + public static final String SCHEMA_JSON = "table_schema"; + public static final String PARTITION_DESC = "partition_descs"; + public static final String REQUIRED_FIELDS = "required_fields"; + public static final String OPTIONS = "options"; + public static final String SUBSTRAIT_PREDICATE = "substrait_predicate"; + public static final String CDC_COLUMN = "lakesoul_cdc_change_column"; + public static final String LIST_DELIM = ";"; + public static final String PARTITIONS_KV_DELIM = "="; + public static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + public static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + public static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + public static final String FS_S3A_REGION = "fs.s3a.endpoint.region"; + public static final String FS_S3A_PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; + + private static final OffsetDateTime EPOCH; + private static final LocalDate EPOCH_DAY; + + static { + EPOCH = Instant.ofEpochSecond(0L).atOffset(ZoneOffset.UTC); + EPOCH_DAY = EPOCH.toLocalDate(); + } + + public static List<PartitionInfo> applyPartitionFilters( + List<PartitionInfo> allPartitionInfo, + String tableName, + Schema partitionArrowSchema, + Map<String, ColumnRange> columnNameToRange + ) throws IOException { + + Expression conjunctionFilter = null; + for (Field field : partitionArrowSchema.getFields()) { + ColumnRange columnRange = columnNameToRange.get(field.getName()); + if (columnRange != null) { + Expression expr = columnRangeToSubstraitFilter(field, columnRange); + if (expr != null) { + if (conjunctionFilter == null) { + conjunctionFilter = expr; + } else { + conjunctionFilter = SubstraitUtil.and(conjunctionFilter, expr); + } + } + } + } + return SubstraitUtil.applyPartitionFilters( + allPartitionInfo, + partitionArrowSchema, + SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName) + ); + } + + public static Expression columnRangeToSubstraitFilter( + Field columnField, + ColumnRange columnRange + ) throws IOException { + Optional<RangeSet<ColumnBound>> rangeSetOpt = columnRange.getRangeSet(); + if (columnRange.hasConjunctiveIsNull() || !rangeSetOpt.isPresent()) { + return SubstraitUtil.CONST_TRUE; + } else { + RangeSet<ColumnBound> rangeSet = rangeSetOpt.get(); + if (rangeSet.isEmpty()) { + return SubstraitUtil.CONST_TRUE; + } else { + Expression conjunctionFilter = null; + for (Range range : rangeSet.asRanges()) { + Expression expr = rangeToSubstraitFilter(columnField, range); + if (expr != null) { + if (conjunctionFilter == null) { + conjunctionFilter = expr; + } else { + conjunctionFilter = SubstraitUtil.or(conjunctionFilter, expr); + } + } + } + return conjunctionFilter; + } + } + } + + public static Expression rangeToSubstraitFilter(Field columnField, Range range) throws IOException { + if (!range.hasLowerBound() && !range.hasUpperBound()) { + // Range.all() + return SubstraitUtil.CONST_TRUE; + } else { + Expression upper = SubstraitUtil.CONST_TRUE; + if (range.hasUpperBound()) { + String func = range.upperBoundType() == BoundType.OPEN ? "lt:any_any" : "lte:any_any"; + Expression left = SubstraitUtil.arrowFieldToSubstraitField(columnField); + Expression right = SubstraitUtil.anyToSubstraitLiteral( + SubstraitUtil.arrowFieldToSubstraitType(columnField), + ((ColumnBound) range.upperEndpoint()).getValue().getRealValue()); + upper = SubstraitUtil.makeBinary( + left, + right, + DefaultExtensionCatalog.FUNCTIONS_COMPARISON, + func, + TypeCreator.NULLABLE.BOOLEAN + ); + } + Expression lower = SubstraitUtil.CONST_TRUE; + if (range.hasLowerBound()) { + String func = range.lowerBoundType() == BoundType.OPEN ? "gt:any_any" : "gte:any_any"; + Expression left = SubstraitUtil.arrowFieldToSubstraitField(columnField); + Expression right = SubstraitUtil.anyToSubstraitLiteral( + SubstraitUtil.arrowFieldToSubstraitType(columnField), + ((ColumnBound) range.lowerEndpoint()).getValue().getRealValue()); + lower = SubstraitUtil.makeBinary( + left, + right, + DefaultExtensionCatalog.FUNCTIONS_COMPARISON, + func, + TypeCreator.NULLABLE.BOOLEAN + ); + } + return SubstraitUtil.and(upper, lower); + } + } + + public static io.substrait.proto.Plan getPushPredicate( + List<Expr> conjuncts, + String tableName, + Schema tableSchema, + Schema partitionArrowSchema, + Map<String, String> properties, + boolean incRead + ) throws IOException { + + Set<String> partitionColumn = + partitionArrowSchema + .getFields() + .stream() + .map(Field::getName) + .collect(Collectors.toSet()); + Expression conjunctionFilter = null; + String cdcColumn = properties.get(CDC_COLUMN); + if (cdcColumn != null && !incRead) { + conjunctionFilter = SubstraitUtil.cdcColumnMergeOnReadFilter(tableSchema.findField(cdcColumn)); + } + for (Expr expr : conjuncts) { + if (!isAllPartitionPredicate(expr, partitionColumn)) { + Expression predicate = convertToSubstraitExpr(expr, tableSchema); + if (predicate != null) { + if (conjunctionFilter == null) { + conjunctionFilter = predicate; + } else { + conjunctionFilter = SubstraitUtil.and(conjunctionFilter, predicate); + } + } + } + } + if (conjunctionFilter == null) { + return null; + } + return SubstraitUtil.substraitExprToProto(conjunctionFilter, tableName); + } + + public static boolean isAllPartitionPredicate(Expr predicate, Set<String> partitionColumns) { + if (predicate == null) { + return false; + } + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + return isAllPartitionPredicate(compoundPredicate.getChild(0), partitionColumns) + && isAllPartitionPredicate(compoundPredicate.getChild(1), partitionColumns); + } + // Make sure the col slot is always first + SlotRef slotRef = convertDorisExprToSlotRef(predicate.getChild(0)); + LiteralExpr literalExpr = convertDorisExprToLiteralExpr(predicate.getChild(1)); + if (slotRef == null || literalExpr == null) { + return false; + } + String colName = slotRef.getColumnName(); + return partitionColumns.contains(colName); + + } + + public static SlotRef convertDorisExprToSlotRef(Expr expr) { + SlotRef slotRef = null; + if (expr instanceof SlotRef) { + slotRef = (SlotRef) expr; + } else if (expr instanceof CastExpr) { + if (expr.getChild(0) instanceof SlotRef) { + slotRef = (SlotRef) expr.getChild(0); + } + } + return slotRef; + } + + public static LiteralExpr convertDorisExprToLiteralExpr(Expr expr) { + LiteralExpr literalExpr = null; + if (expr instanceof LiteralExpr) { + literalExpr = (LiteralExpr) expr; + } else if (expr instanceof CastExpr) { + if (expr.getChild(0) instanceof LiteralExpr) { + literalExpr = (LiteralExpr) expr.getChild(0); + } + } + return literalExpr; + } + + public static Expression convertToSubstraitExpr(Expr predicate, Schema tableSchema) throws IOException { + if (predicate == null) { + return null; + } + if (predicate instanceof BoolLiteral) { + BoolLiteral boolLiteral = (BoolLiteral) predicate; + boolean value = boolLiteral.getValue(); + if (value) { + return SubstraitUtil.CONST_TRUE; + } else { + return SubstraitUtil.CONST_FALSE; + } + } + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + switch (compoundPredicate.getOp()) { + case AND: { + Expression left = convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema); + Expression right = convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema); + if (left != null && right != null) { + return SubstraitUtil.and(left, right); + } else if (left != null) { + return left; + } else { + return right; + } + } + case OR: { + Expression left = convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema); + Expression right = convertToSubstraitExpr(compoundPredicate.getChild(1), tableSchema); + if (left != null && right != null) { + return SubstraitUtil.or(left, right); + } + return null; + } + case NOT: { + Expression child = convertToSubstraitExpr(compoundPredicate.getChild(0), tableSchema); + if (child != null) { + return SubstraitUtil.not(child); + } + return null; + } + default: + return null; + } + } else if (predicate instanceof InPredicate) { + InPredicate inExpr = (InPredicate) predicate; + if (inExpr.contains(Subquery.class)) { + return null; + } + SlotRef slotRef = convertDorisExprToSlotRef(inExpr.getChild(0)); + if (slotRef == null) { + return null; + } + String colName = slotRef.getColumnName(); + Field field = tableSchema.findField(colName); + Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field); + + colName = field.getName(); + Type type = field.getType().accept( + new SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable()) + ); + List<Expression.Literal> valueList = new ArrayList<>(); + for (int i = 1; i < inExpr.getChildren().size(); ++i) { + if (!(inExpr.getChild(i) instanceof LiteralExpr)) { + return null; + } + LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i); + Object value = extractDorisLiteral(type, literalExpr); + if (value == null) { + return null; + } + valueList.add(SubstraitUtil.anyToSubstraitLiteral(type, value)); + } + if (inExpr.isNotIn()) { + // not in + return SubstraitUtil.notIn(fieldRef, valueList); + } else { + // in + return SubstraitUtil.in(fieldRef, valueList); + } + } + return convertBinaryExpr(predicate, tableSchema); + } + + private static Expression convertBinaryExpr(Expr dorisExpr, Schema tableSchema) throws IOException { + TExprOpcode opcode = dorisExpr.getOpcode(); + // Make sure the col slot is always first + SlotRef slotRef = convertDorisExprToSlotRef(dorisExpr.getChild(0)); + LiteralExpr literalExpr = convertDorisExprToLiteralExpr(dorisExpr.getChild(1)); + if (slotRef == null || literalExpr == null) { + return null; + } + String colName = slotRef.getColumnName(); + Field field = tableSchema.findField(colName); + Expression fieldRef = SubstraitUtil.arrowFieldToSubstraitField(field); + + Type type = field.getType().accept( + new SubstraitUtil.ArrowTypeToSubstraitTypeConverter(field.isNullable()) + ); + Object value = extractDorisLiteral(type, literalExpr); + if (value == null) { + if (opcode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) { + return SubstraitUtil.makeUnary( + fieldRef, + DefaultExtensionCatalog.FUNCTIONS_COMPARISON, + "is_null:any", + TypeCreator.NULLABLE.BOOLEAN); + } else { + return null; + } + } + Expression literal = SubstraitUtil.anyToSubstraitLiteral( + type, + value + ); + + String namespace; + String func; + switch (opcode) { + case EQ: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "equal:any_any"; + break; + case EQ_FOR_NULL: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "is_null:any"; + break; + case NE: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "not_equal:any_any"; + break; + case GE: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "gte:any_any"; + break; + case GT: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "gt:any_any"; + break; + case LE: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "lte:any_any"; + break; + case LT: + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "lt:any_any"; + break; + case INVALID_OPCODE: + if (dorisExpr instanceof IsNullPredicate) { + if (((IsNullPredicate) dorisExpr).isNotNull()) { + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "is_not_null:any"; + } else { + namespace = DefaultExtensionCatalog.FUNCTIONS_COMPARISON; + func = "is_null:any"; + } + break; + } + return null; + default: + return null; + } + return SubstraitUtil.makeBinary(fieldRef, literal, namespace, func, TypeCreator.NULLABLE.BOOLEAN); + } + + public static Object extractDorisLiteral(Type type, LiteralExpr expr) { + + if (expr instanceof BoolLiteral) { + if (type instanceof Type.Bool) { + return ((BoolLiteral) expr).getValue(); + } + if (type instanceof Type.Str) { + return expr.getStringValue(); + } + } else if (expr instanceof DateLiteral) { + DateLiteral dateLiteral = (DateLiteral) expr; + if (type instanceof Type.Date) { + if (dateLiteral.getType().isDatetimeV2() || dateLiteral.getType().isDatetime()) { + return null; + } + return (int) LocalDate.of((int) dateLiteral.getYear(), + (int) dateLiteral.getMonth(), + (int) dateLiteral.getDay()).toEpochDay(); + } + if (type instanceof Type.TimestampTZ || type instanceof Type.Timestamp) { + return dateLiteral.getLongValue(); + } + if (type instanceof Type.Str) { + return expr.getStringValue(); + } + } else if (expr instanceof DecimalLiteral) { + DecimalLiteral decimalLiteral = (DecimalLiteral) expr; + if (type instanceof Type.Decimal) { + return decimalLiteral.getValue(); + } else if (type instanceof Type.FP64) { + return decimalLiteral.getDoubleValue(); + } + if (type instanceof Type.Str) { + return expr.getStringValue(); + } + } else if (expr instanceof FloatLiteral) { + FloatLiteral floatLiteral = (FloatLiteral) expr; + + if (floatLiteral.getType() == org.apache.doris.catalog.Type.FLOAT) { + return type instanceof Type.FP32 + || type instanceof Type.FP64 + || type instanceof Type.Decimal ? ((FloatLiteral) expr).getValue() : null; + } else { + return type instanceof Type.FP64 + || type instanceof Type.Decimal ? ((FloatLiteral) expr).getValue() : null; + } + } else if (expr instanceof IntLiteral) { + if (type instanceof Type.I8 + || type instanceof Type.I16 + || type instanceof Type.I32 + || type instanceof Type.I64 + || type instanceof Type.FP32 + || type instanceof Type.FP64 + || type instanceof Type.Decimal + || type instanceof Type.Date + ) { + return expr.getRealValue(); + } + if (!expr.getType().isInteger32Type()) { + if (type instanceof Type.Time || type instanceof Type.Timestamp || type instanceof Type.TimestampTZ) { + return expr.getLongValue(); + } + } + + } else if (expr instanceof StringLiteral) { + String value = expr.getStringValue(); + if (type instanceof Type.Str) { + return value; + } + if (type instanceof Type.Date) { + try { + return (int) ChronoUnit.DAYS.between( + EPOCH_DAY, + LocalDate.parse(value, DateTimeFormatter.ISO_LOCAL_DATE)); + } catch (DateTimeParseException e) { + return null; + } + } + if (type instanceof Type.Timestamp || type instanceof Type.TimestampTZ) { + try { + return ChronoUnit.MICROS.between( + EPOCH, + OffsetDateTime.parse(value, DateTimeFormatter.ISO_DATE_TIME)); + } catch (DateTimeParseException e) { + return null; + } + } + } + return null; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java index 1779aeaca10..dd15ec310de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java @@ -22,9 +22,13 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; +import org.apache.doris.datasource.lakesoul.LakeSoulUtils; +import org.apache.doris.datasource.property.constants.MinioProperties; +import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -34,15 +38,26 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLakeSoulFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.DataFileInfo; import com.dmetasoul.lakesoul.meta.DataOperation; import com.dmetasoul.lakesoul.meta.LakeSoulOptions; +import com.dmetasoul.lakesoul.meta.entity.PartitionInfo; import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; -import com.lakesoul.shaded.com.alibaba.fastjson.JSON; -import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject; +import io.substrait.proto.Plan; +import lombok.SneakyThrows; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -52,19 +67,55 @@ import java.util.stream.Collectors; public class LakeSoulScanNode extends FileQueryScanNode { - protected final LakeSoulExternalTable lakeSoulExternalTable; + private static final Logger LOG = LogManager.getLogger(LakeSoulScanNode.class); - protected final TableInfo table; + protected LakeSoulExternalTable lakeSoulExternalTable; + + String tableName; + + String location; + + String partitions; + + Schema tableArrowSchema; + + Schema partitionArrowSchema; + private Map<String, String> tableProperties; + + String readType; public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv); + } + + @Override + protected void doInitialize() throws UserException { + super.doInitialize(); lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable(); - table = lakeSoulExternalTable.getLakeSoulTableInfo(); + TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo(); + location = tableInfo.getTablePath(); + tableName = tableInfo.getTableName(); + partitions = tableInfo.getPartitions(); + readType = LakeSoulOptions.ReadType$.MODULE$.FULL_READ(); + try { + tableProperties = new ObjectMapper().readValue( + tableInfo.getProperties(), + new TypeReference<Map<String, String>>() {} + ); + tableArrowSchema = Schema.fromJSON(tableInfo.getTableSchema()); + List<Field> partitionFields = + DBUtil.parseTableInfoPartitions(partitions) + .rangeKeys + .stream() + .map(tableArrowSchema::findField).collect(Collectors.toList()); + partitionArrowSchema = new Schema(partitionFields); + } catch (IOException e) { + throw new UserException(e); + } } @Override protected TFileType getLocationType() throws UserException { - String location = table.getTablePath(); return getLocationType(location); } @@ -81,12 +132,12 @@ public class LakeSoulScanNode extends FileQueryScanNode { @Override protected List<String> getPathPartitionKeys() throws UserException { - return new ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys); + return new ArrayList<>(DBUtil.parseTableInfoPartitions(partitions).rangeKeys); } @Override protected TableIf getTargetTable() throws UserException { - return lakeSoulExternalTable; + return desc.getTable(); } @Override @@ -94,13 +145,21 @@ public class LakeSoulScanNode extends FileQueryScanNode { return lakeSoulExternalTable.getHadoopProperties(); } + @SneakyThrows @Override protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { + if (LOG.isDebugEnabled()) { + LOG.debug("{}", rangeDesc); + } if (split instanceof LakeSoulSplit) { setLakeSoulParams(rangeDesc, (LakeSoulSplit) split); } } + public ExternalCatalog getCatalog() { + return lakeSoulExternalTable.getCatalog(); + } + public static boolean isExistHashPartition(TableInfo tif) { JSONObject tableProperties = JSON.parseObject(tif.getProperties()); if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM()) @@ -111,13 +170,53 @@ public class LakeSoulScanNode extends FileQueryScanNode { } } - public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSplit) { + public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSplit) throws IOException { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value()); TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc(); fileDesc.setFilePaths(lakeSoulSplit.getPaths()); fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys()); fileDesc.setTableSchema(lakeSoulSplit.getTableSchema()); + + + JSONObject options = new JSONObject(); + Plan predicate = LakeSoulUtils.getPushPredicate( + conjuncts, + tableName, + tableArrowSchema, + partitionArrowSchema, + tableProperties, + readType.equals(LakeSoulOptions.ReadType$.MODULE$.INCREMENTAL_READ())); + if (predicate != null) { + options.put(LakeSoulUtils.SUBSTRAIT_PREDICATE, SubstraitUtil.encodeBase64String(predicate)); + } + Map<String, String> catalogProps = getCatalog().getProperties(); + if (LOG.isDebugEnabled()) { + LOG.debug("{}", catalogProps); + } + + if (catalogProps.get(S3Properties.Env.ENDPOINT) != null) { + options.put(LakeSoulUtils.FS_S3A_ENDPOINT, catalogProps.get(S3Properties.Env.ENDPOINT)); + if (options.containsKey(MinioProperties.ENDPOINT)) { + // Use path style access for minio + options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "true"); + } else { + // use virtual hosted style access for all other s3 compatible storage services + options.put(LakeSoulUtils.FS_S3A_PATH_STYLE_ACCESS, "false"); + } + if (catalogProps.get(S3Properties.Env.ACCESS_KEY) != null) { + options.put(LakeSoulUtils.FS_S3A_ACCESS_KEY, catalogProps.get(S3Properties.Env.ACCESS_KEY)); + } + if (catalogProps.get(S3Properties.Env.SECRET_KEY) != null) { + options.put(LakeSoulUtils.FS_S3A_SECRET_KEY, catalogProps.get(S3Properties.Env.SECRET_KEY)); + } + if (catalogProps.get(S3Properties.Env.REGION) != null) { + options.put(LakeSoulUtils.FS_S3A_REGION, catalogProps.get(S3Properties.Env.REGION)); + } + } + + fileDesc.setOptions(JSON.toJSONString(options)); + fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc() .entrySet().stream().map(entry -> String.format("%s=%s", entry.getKey(), entry.getValue())).collect(Collectors.toList())); @@ -126,24 +225,51 @@ public class LakeSoulScanNode extends FileQueryScanNode { } public List<Split> getSplits() throws UserException { + if (LOG.isDebugEnabled()) { + LOG.debug("getSplits with columnFilters={}", columnFilters); + LOG.debug("getSplits with columnNameToRange={}", columnNameToRange); + LOG.debug("getSplits with conjuncts={}", conjuncts); + } + + List<PartitionInfo> allPartitionInfo = lakeSoulExternalTable.listPartitionInfo(); + if (LOG.isDebugEnabled()) { + LOG.debug("allPartitionInfo={}", allPartitionInfo); + } + List<PartitionInfo> filteredPartitionInfo = allPartitionInfo; + try { + filteredPartitionInfo = + LakeSoulUtils.applyPartitionFilters( + allPartitionInfo, + tableName, + partitionArrowSchema, + columnNameToRange + ); + } catch (IOException e) { + throw new UserException(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("filteredPartitionInfo={}", filteredPartitionInfo); + } + DataFileInfo[] dataFileInfos = DataOperation.getTableDataInfo(filteredPartitionInfo); + List<Split> splits = new ArrayList<>(); Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition = new LinkedHashMap<>(); - TableInfo tif = table; - DataFileInfo[] dfinfos = DataOperation.getTableDataInfo(table.getTableId()); - for (DataFileInfo pif : dfinfos) { - if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) { - splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) - .computeIfAbsent(pif.file_bucket_id(), v -> new ArrayList<>()) - .add(pif.path()); + TableInfo tableInfo = lakeSoulExternalTable.getLakeSoulTableInfo(); + + for (DataFileInfo fileInfo : dataFileInfos) { + if (isExistHashPartition(tableInfo) && fileInfo.file_bucket_id() != -1) { + splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> new LinkedHashMap<>()) + .computeIfAbsent(fileInfo.file_bucket_id(), v -> new ArrayList<>()) + .add(fileInfo.path()); } else { - splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) + splitByRangeAndHashPartition.computeIfAbsent(fileInfo.range_partitions(), k -> new LinkedHashMap<>()) .computeIfAbsent(-1, v -> new ArrayList<>()) - .add(pif.path()); + .add(fileInfo.path()); } } List<String> pkKeys = null; - if (!table.getPartitions().equals(";")) { - pkKeys = Lists.newArrayList(table.getPartitions().split(";")[1].split(",")); + if (!tableInfo.getPartitions().equals(";")) { + pkKeys = Lists.newArrayList(tableInfo.getPartitions().split(";")[1].split(",")); } for (Map.Entry<String, Map<Integer, List<String>>> entry : splitByRangeAndHashPartition.entrySet()) { @@ -161,7 +287,7 @@ public class LakeSoulScanNode extends FileQueryScanNode { split.getValue(), pkKeys, rangeDesc, - table.getTableSchema(), + tableInfo.getTableSchema(), 0, 0, 0, new String[0], null); lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL); @@ -169,8 +295,6 @@ public class LakeSoulScanNode extends FileQueryScanNode { } } return splits; - } - } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java new file mode 100644 index 00000000000..016819a382b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/lakesoul/LakeSoulPredicateTest.java @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.BoolLiteral; +import org.apache.doris.analysis.CompoundPredicate; +import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.analysis.DateLiteral; +import org.apache.doris.analysis.DecimalLiteral; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; + +import com.dmetasoul.lakesoul.lakesoul.io.substrait.SubstraitUtil; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import io.substrait.expression.Expression; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class LakeSoulPredicateTest { + + public static Schema schema; + + @BeforeClass + public static void before() throws AnalysisException, IOException { + schema = new Schema( + Arrays.asList( + new Field("c_int", FieldType.nullable(new ArrowType.Int(32, true)), null), + new Field("c_long", FieldType.nullable(new ArrowType.Int(64, true)), null), + new Field("c_bool", FieldType.nullable(new ArrowType.Bool()), null), + new Field("c_float", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null), + new Field("c_double", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null), + new Field("c_dec", FieldType.nullable(new ArrowType.Decimal(20, 10)), null), + new Field("c_date", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null), + new Field("c_ts", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), null), + new Field("c_str", FieldType.nullable(new ArrowType.Utf8()), null) + )); + } + + @Test + public void testBinaryPredicate() throws AnalysisException, IOException { + List<LiteralExpr> literalList = new ArrayList<LiteralExpr>() {{ + add(new BoolLiteral(true)); + add(new DateLiteral("2023-01-02", Type.DATEV2)); + add(new DateLiteral("2024-01-02 12:34:56.123456", Type.DATETIMEV2)); + add(new DecimalLiteral(new BigDecimal("1.23"))); + add(new FloatLiteral(1.23, Type.FLOAT)); + add(new FloatLiteral(3.456, Type.DOUBLE)); + add(new IntLiteral(1, Type.TINYINT)); + add(new IntLiteral(1, Type.SMALLINT)); + add(new IntLiteral(1, Type.INT)); + add(new IntLiteral(1, Type.BIGINT)); + add(new StringLiteral("abc")); + add(new StringLiteral("2023-01-02")); + add(new StringLiteral("2023-01-02 01:02:03.456789")); + }}; + + List<SlotRef> slotRefs = new ArrayList<SlotRef>() {{ + add(new SlotRef(new TableName(), "c_int")); + add(new SlotRef(new TableName(), "c_long")); + add(new SlotRef(new TableName(), "c_bool")); + add(new SlotRef(new TableName(), "c_float")); + add(new SlotRef(new TableName(), "c_double")); + add(new SlotRef(new TableName(), "c_dec")); + add(new SlotRef(new TableName(), "c_date")); + add(new SlotRef(new TableName(), "c_ts")); + add(new SlotRef(new TableName(), "c_str")); + }}; + + // true indicates support for pushdown + Boolean[][] expects = new Boolean[][] { + { // int + false, false, false, false, false, false, true, true, true, true, false, false, false + }, + { // long + false, false, false, false, false, false, true, true, true, true, false, false, false + }, + { // boolean + true, false, false, false, false, false, false, false, false, false, false, false, false + }, + { // float + false, false, false, false, true, false, true, true, true, true, false, false, false + }, + { // double + false, false, false, true, true, true, true, true, true, true, false, false, false + }, + { // decimal + false, false, false, true, true, true, true, true, true, true, false, false, false + }, + { // date + false, true, false, false, false, false, true, true, true, true, false, true, false + }, + { // timestamp + false, true, true, false, false, false, false, false, false, true, false, false, false + }, + { // string + true, true, true, true, false, false, false, false, false, false, true, true, true + } + }; + + ArrayListMultimap<Boolean, Expr> validPredicateMap = ArrayListMultimap.create(); + + // binary predicate + for (int i = 0; i < slotRefs.size(); i++) { + final int loc = i; + List<Boolean> ret = literalList.stream().map(literal -> { + BinaryPredicate expr = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRefs.get(loc), literal); + Expression expression = null; + try { + expression = LakeSoulUtils.convertToSubstraitExpr(expr, schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + validPredicateMap.put(expression != null, expr); + return expression != null; + }).collect(Collectors.toList()); + Assert.assertArrayEquals(expects[i], ret.toArray()); + } + + // in predicate + for (int i = 0; i < slotRefs.size(); i++) { + final int loc = i; + List<Boolean> ret = literalList.stream().map(literal -> { + InPredicate expr = new InPredicate(slotRefs.get(loc), Lists.newArrayList(literal), false); + Expression expression = null; + try { + expression = LakeSoulUtils.convertToSubstraitExpr(expr, schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + validPredicateMap.put(expression != null, expr); + return expression != null; + }).collect(Collectors.toList()); + Assert.assertArrayEquals(expects[i], ret.toArray()); + } + + // not in predicate + for (int i = 0; i < slotRefs.size(); i++) { + final int loc = i; + List<Boolean> ret = literalList.stream().map(literal -> { + InPredicate expr = new InPredicate(slotRefs.get(loc), Lists.newArrayList(literal), true); + Expression expression = null; + try { + expression = LakeSoulUtils.convertToSubstraitExpr(expr, schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + validPredicateMap.put(expression != null, expr); + return expression != null; + }).collect(Collectors.toList()); + Assert.assertArrayEquals(expects[i], ret.toArray()); + } + + // bool literal + + Expression trueExpr = LakeSoulUtils.convertToSubstraitExpr(new BoolLiteral(true), schema); + Expression falseExpr = LakeSoulUtils.convertToSubstraitExpr(new BoolLiteral(false), schema); + Assert.assertEquals(SubstraitUtil.CONST_TRUE, trueExpr); + Assert.assertEquals(SubstraitUtil.CONST_FALSE, falseExpr); + validPredicateMap.put(true, new BoolLiteral(true)); + validPredicateMap.put(true, new BoolLiteral(false)); + + List<Expr> validExprs = validPredicateMap.get(true); + List<Expr> invalidExprs = validPredicateMap.get(false); + // OR predicate + // both valid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < validExprs.size(); j++) { + CompoundPredicate orPredicate = new CompoundPredicate(Operator.OR, + validExprs.get(i), validExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema); + Assert.assertNotNull("pred: " + orPredicate.toSql(), expression); + } + } + // both invalid + for (int i = 0; i < invalidExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate orPredicate = new CompoundPredicate(Operator.OR, + invalidExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema); + Assert.assertNull("pred: " + orPredicate.toSql(), expression); + } + } + // valid or invalid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate orPredicate = new CompoundPredicate(Operator.OR, + validExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(orPredicate, schema); + Assert.assertNull("pred: " + orPredicate.toSql(), expression); + } + } + + // AND predicate + // both valid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < validExprs.size(); j++) { + CompoundPredicate andPredicate = new CompoundPredicate(Operator.AND, + validExprs.get(i), validExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema); + Assert.assertNotNull("pred: " + andPredicate.toSql(), expression); + } + } + // both invalid + for (int i = 0; i < invalidExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate andPredicate = new CompoundPredicate(Operator.AND, + invalidExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema); + Assert.assertNull("pred: " + andPredicate.toSql(), expression); + } + } + // valid and invalid + for (int i = 0; i < validExprs.size(); i++) { + for (int j = 0; j < invalidExprs.size(); j++) { + CompoundPredicate andPredicate = new CompoundPredicate(Operator.AND, + validExprs.get(i), invalidExprs.get(j)); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(andPredicate, schema); + Assert.assertNotNull("pred: " + andPredicate.toSql(), expression); + Assert.assertEquals(SubstraitUtil.substraitExprToProto(LakeSoulUtils.convertToSubstraitExpr(validExprs.get(i), schema), "table"), + SubstraitUtil.substraitExprToProto(expression, "table")); + } + } + + // NOT predicate + // valid + for (int i = 0; i < validExprs.size(); i++) { + CompoundPredicate notPredicate = new CompoundPredicate(Operator.NOT, + validExprs.get(i), null); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema); + Assert.assertNotNull("pred: " + notPredicate.toSql(), expression); + } + // invalid + for (int i = 0; i < invalidExprs.size(); i++) { + CompoundPredicate notPredicate = new CompoundPredicate(Operator.NOT, + invalidExprs.get(i), null); + Expression expression = LakeSoulUtils.convertToSubstraitExpr(notPredicate, schema); + Assert.assertNull("pred: " + notPredicate.toSql(), expression); + } + } +} diff --git a/fe/pom.xml b/fe/pom.xml index 42ab14bd3dc..b679f466f07 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -317,6 +317,8 @@ under the License. <hudi.version>0.14.1</hudi.version> <presto.hadoop.version>2.7.4-11</presto.hadoop.version> <presto.hive.version>3.0.0-8</presto.hive.version> + <!-- lakesoul --> + <lakesoul.version>2.6.2</lakesoul.version> <parquet.version>1.13.1</parquet.version> <commons-collections.version>3.2.2</commons-collections.version> @@ -1824,4 +1826,4 @@ under the License. </snapshots> </repository> </repositories> -</project> \ No newline at end of file +</project> diff --git a/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy new file mode 100644 index 00000000000..01c5d981c49 --- /dev/null +++ b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_filter.groovy @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_lakesoul_filter", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableLakesoulTest") + // open it when docker image is ready to run in regression test + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String catalog_name = "lakesoul" + String db_name = "default" + String pg_user = context.config.otherConfigs.get("lakesoulPGUser") + String pg_pwd = context.config.otherConfigs.get("lakesoulPGPwd") + String pg_url = context.config.otherConfigs.get("lakesoulPGUrl") + String minio_ak = context.config.otherConfigs.get("lakesoulMinioAK") + String minio_sk = context.config.otherConfigs.get("lakesoulMinioSK") + String minio_endpoint = context.config.otherConfigs.get("lakesoulMinioEndpoint") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog lakesoul properties ( + 'type'='lakesoul', + 'lakesoul.pg.username'='${pg_user}', + 'lakesoul.pg.password'='${pg_pwd}', + 'lakesoul.pg.url'='${pg_url}', + 'minio.endpoint'='${minio_endpoint}', + 'minio.access_key'='${minio_ak}', + 'minio.secret_key'='${minio_sk}' + );""" + + // analyze + sql """use `${catalog_name}`.`${db_name}`""" + + sql """show tables;""" + // select + sql """select * from region;""" + + sql """select * from nation;""" + + sql """select * from nation where n_regionkey = 0 or n_nationkey > 14;""" + + sql """select * from nation where n_regionkey = 0 and n_nationkey > 0;""" + + sql """select * from nation where n_regionkey = 0;""" + + // non-selecting query + sql """select count(*) from customer;""" + + // filter by non-partition column + sql """select count(*) from customer where c_mktsegment='BUILDING';""" + + // filter by partition column + sql """select count(*) from customer where c_nationkey=19;""" + + // filter by both partition and non-partition column + sql """select count(*) from customer where c_mktsegment='BUILDING' and c_nationkey=19;""" + + sql """select * from lineitem where l_shipdate <= DATE '1992-12-01' limit 10;""" + + sql """select count(*) from part where p_type like 'MEDIUM POLISHED%';""" + } +} + diff --git a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy index 9369a28e8fe..6a1f83a5e24 100644 --- a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy +++ b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy @@ -43,18 +43,18 @@ suite("test_external_table_lakesoul", "p2,external,lakesoul,external_remote,exte """ // analyze - sql """use `${catalog_name}`.`${db_name}`""" - - sql q1 - sql q2 - sql q3 - sql q4 - sql q5 - sql q6 - sql q7 - sql q8 - sql q9 - sql q11 + sql """use `${catalog_name}`.`${db_name}`""" + + sql q1 + sql q2 + sql q3 + sql q4 + sql q5 + sql q6 + sql q7 + sql q8 + sql q9 + sql q11 } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org