This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d31bca199f0a4f66de1ac2400f86c8c58a2ac0af Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Wed Apr 10 20:39:53 2024 +0800 [feature](iceberg)The new DDL syntax is added to create iceberg partitioned tables (#33338) support partition by : ``` create table tb1 (c1 string, ts datetime) engine = iceberg partition by (c1, day(ts)) () properties ("a"="b") ``` --- .../datasource/iceberg/DorisTypeToIcebergType.java | 14 +- .../datasource/iceberg/IcebergMetadataOps.java | 2 +- .../doris/datasource/iceberg/IcebergUtils.java | 94 +++++----- .../trees/plans/commands/info/CreateTableInfo.java | 4 + .../datasource/iceberg/CreateIcebergTableTest.java | 196 +++++++++++++++++++++ 5 files changed, 247 insertions(+), 63 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java index d6370c583da..52e4b6cf17a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java @@ -102,27 +102,19 @@ public class DorisTypeToIcebergType extends DorisTypeVisitor<Type> { PrimitiveType primitiveType = atomic.getPrimitiveType(); if (primitiveType.equals(PrimitiveType.BOOLEAN)) { return Types.BooleanType.get(); - } else if (primitiveType.equals(PrimitiveType.TINYINT) - || primitiveType.equals(PrimitiveType.SMALLINT) - || primitiveType.equals(PrimitiveType.INT)) { + } else if (primitiveType.equals(PrimitiveType.INT)) { return Types.IntegerType.get(); - } else if (primitiveType.equals(PrimitiveType.BIGINT) - || primitiveType.equals(PrimitiveType.LARGEINT)) { + } else if (primitiveType.equals(PrimitiveType.BIGINT)) { return Types.LongType.get(); } else if (primitiveType.equals(PrimitiveType.FLOAT)) { return Types.FloatType.get(); } else if (primitiveType.equals(PrimitiveType.DOUBLE)) { return Types.DoubleType.get(); - } else if (primitiveType.equals(PrimitiveType.CHAR) - || primitiveType.equals(PrimitiveType.VARCHAR) - || primitiveType.equals(PrimitiveType.STRING)) { + } else if (primitiveType.equals(PrimitiveType.STRING)) { return Types.StringType.get(); } else if (primitiveType.equals(PrimitiveType.DATE) || primitiveType.equals(PrimitiveType.DATEV2)) { return Types.DateType.get(); - } else if (primitiveType.equals(PrimitiveType.TIME) - || primitiveType.equals(PrimitiveType.TIMEV2)) { - return Types.TimeType.get(); } else if (primitiveType.equals(PrimitiveType.DECIMALV2) || primitiveType.isDecimalV3Type()) { return Types.DecimalType.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 0c188fae301..18efd7f1b7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -149,7 +149,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps { Schema schema = new Schema(visit.asNestedType().asStructType().fields()); Map<String, String> properties = stmt.getProperties(); properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE); - PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema); + PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema); catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); db.setUnInitialized(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 08c4be4ceac..70c384a0c4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -25,10 +25,12 @@ import org.apache.doris.analysis.DateLiteral; import org.apache.doris.analysis.DecimalLiteral; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FloatLiteral; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.InPredicate; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.PartitionDesc; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; @@ -63,8 +65,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Iceberg utils @@ -78,7 +78,6 @@ public class IcebergUtils { } }; static long MILLIS_TO_NANO_TIME = 1000; - private static final Pattern PARTITION_REG = Pattern.compile("(\\w+)\\((\\d+)?,?(\\w+)\\)"); // https://iceberg.apache.org/spec/#schemas-and-data-types // All time and timestamp values are stored with microsecond precision private static final int ICEBERG_DATETIME_SCALE_MS = 6; @@ -415,57 +414,51 @@ public class IcebergUtils { return slotRef; } - // "partition"="c1;day(c1);bucket(4,c3)" - public static PartitionSpec solveIcebergPartitionSpec(Map<String, String> properties, Schema schema) + public static PartitionSpec solveIcebergPartitionSpec(PartitionDesc partitionDesc, Schema schema) throws UserException { - if (properties.containsKey("partition")) { - PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - String par = properties.get("partition").replaceAll(" ", ""); - String[] pars = par.split(";"); - for (String func : pars) { - if (func.contains("(")) { - Matcher matcher = PARTITION_REG.matcher(func); - if (matcher.matches()) { - switch (matcher.group(1).toLowerCase()) { - case "bucket": - builder.bucket(matcher.group(3), Integer.parseInt(matcher.group(2))); - break; - case "year": - case "years": - builder.year(matcher.group(3)); - break; - case "month": - case "months": - builder.month(matcher.group(3)); - break; - case "date": - case "day": - case "days": - builder.day(matcher.group(3)); - break; - case "date_hour": - case "hour": - case "hours": - builder.hour(matcher.group(3)); - break; - case "truncate": - builder.truncate(matcher.group(3), Integer.parseInt(matcher.group(2))); - break; - default: - throw new UserException("unsupported partition for " + matcher.group(1)); - } - } else { - throw new UserException("failed to get partition info from " + func); - } - } else { - builder.identity(func); + if (partitionDesc == null) { + return PartitionSpec.unpartitioned(); + } + + ArrayList<Expr> partitionExprs = partitionDesc.getPartitionExprs(); + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (Expr expr : partitionExprs) { + if (expr instanceof SlotRef) { + builder.identity(((SlotRef) expr).getColumnName()); + } else if (expr instanceof FunctionCallExpr) { + String exprName = expr.getExprName(); + List<Expr> params = ((FunctionCallExpr) expr).getParams().exprs(); + switch (exprName.toLowerCase()) { + case "bucket": + builder.bucket(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue())); + break; + case "year": + case "years": + builder.year(params.get(0).getExprName()); + break; + case "month": + case "months": + builder.month(params.get(0).getExprName()); + break; + case "date": + case "day": + case "days": + builder.day(params.get(0).getExprName()); + break; + case "date_hour": + case "hour": + case "hours": + builder.hour(params.get(0).getExprName()); + break; + case "truncate": + builder.truncate(params.get(1).getExprName(), Integer.parseInt(params.get(0).getStringValue())); + break; + default: + throw new UserException("unsupported partition for " + exprName); } } - properties.remove("partition"); - return builder.build(); - } else { - return PartitionSpec.unpartitioned(); } + return builder.build(); } private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) { @@ -567,5 +560,4 @@ public class IcebergUtils { } return -1; } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index 0fc746da4e7..4a5a547f022 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -773,4 +773,8 @@ public class CreateTableInfo { partitionDesc, distributionDesc, Maps.newHashMap(properties), extProperties, comment, addRollups, null); } + + public void setIsExternal(boolean isExternal) { + this.isExternal = isExternal; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java new file mode 100644 index 00000000000..863607af9d7 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/CreateIcebergTableTest.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.analysis.CreateCatalogStmt; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DbName; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogFactory; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +public class CreateIcebergTableTest { + + public static String warehouse; + public static IcebergHadoopExternalCatalog icebergCatalog; + public static IcebergMetadataOps ops; + public static String dbName = "testdb"; + public static ConnectContext connectContext; + + @BeforeClass + public static void beforeClass() throws Throwable { + Path warehousePath = Files.createTempDirectory("test_warehouse_"); + warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; + + HashMap<String, String> param = new HashMap<>(); + param.put("type", "iceberg"); + param.put("iceberg.catalog.type", "hadoop"); + param.put("warehouse", warehouse); + + // create catalog + CreateCatalogStmt createCatalogStmt = new CreateCatalogStmt(true, "iceberg", "", param, "comment"); + icebergCatalog = (IcebergHadoopExternalCatalog) CatalogFactory.createFromStmt(1, createCatalogStmt); + icebergCatalog.setInitialized(true); + + // create db + ops = new IcebergMetadataOps(icebergCatalog, icebergCatalog.getCatalog()); + CreateDbStmt createDbStmt = new CreateDbStmt(true, new DbName("iceberg", dbName), null); + ops.createDb(createDbStmt); + icebergCatalog.setInitialized(true); + IcebergExternalDatabase db = new IcebergExternalDatabase(icebergCatalog, 1L, dbName); + icebergCatalog.addDatabaseForTest(db); + + // context + connectContext = new ConnectContext(); + connectContext.setThreadLocalInfo(); + } + + @Test + public void testSimpleTable() throws UserException { + TableIdentifier tb = TableIdentifier.of(dbName, getTableName()); + String sql = "create table " + tb + " (id int) engine = iceberg"; + createTable(sql); + Table table = ops.getCatalog().loadTable(tb); + Schema schema = table.schema(); + Assert.assertEquals(1, schema.columns().size()); + Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec()); + } + + @Test + public void testProperties() throws UserException { + TableIdentifier tb = TableIdentifier.of(dbName, getTableName()); + String sql = "create table " + tb + " (id int) engine = iceberg properties(\"a\"=\"b\")"; + createTable(sql); + Table table = ops.getCatalog().loadTable(tb); + Schema schema = table.schema(); + Assert.assertEquals(1, schema.columns().size()); + Assert.assertEquals(PartitionSpec.unpartitioned(), table.spec()); + Assert.assertEquals("b", table.properties().get("a")); + } + + @Test + public void testType() throws UserException { + TableIdentifier tb = TableIdentifier.of(dbName, getTableName()); + String sql = "create table " + tb + " (" + + "c0 int, " + + "c1 bigint, " + + "c2 float, " + + "c3 double, " + + "c4 string, " + + "c5 date, " + + "c6 decimal(20, 10), " + + "c7 datetime" + + ") engine = iceberg " + + "properties(\"a\"=\"b\")"; + createTable(sql); + Table table = ops.getCatalog().loadTable(tb); + Schema schema = table.schema(); + List<Types.NestedField> columns = schema.columns(); + Assert.assertEquals(8, columns.size()); + Assert.assertEquals(Type.TypeID.INTEGER, columns.get(0).type().typeId()); + Assert.assertEquals(Type.TypeID.LONG, columns.get(1).type().typeId()); + Assert.assertEquals(Type.TypeID.FLOAT, columns.get(2).type().typeId()); + Assert.assertEquals(Type.TypeID.DOUBLE, columns.get(3).type().typeId()); + Assert.assertEquals(Type.TypeID.STRING, columns.get(4).type().typeId()); + Assert.assertEquals(Type.TypeID.DATE, columns.get(5).type().typeId()); + Assert.assertEquals(Type.TypeID.DECIMAL, columns.get(6).type().typeId()); + Assert.assertEquals(Type.TypeID.TIMESTAMP, columns.get(7).type().typeId()); + } + + @Test + public void testPartition() throws UserException { + TableIdentifier tb = TableIdentifier.of(dbName, getTableName()); + String sql = "create table " + tb + " (" + + "id int, " + + "ts1 datetime, " + + "ts2 datetime, " + + "ts3 datetime, " + + "ts4 datetime, " + + "dt1 date, " + + "dt2 date, " + + "dt3 date, " + + "s string" + + ") engine = iceberg " + + "partition by (" + + "id, " + + "bucket(2, id), " + + "year(ts1), " + + "year(dt1), " + + "month(ts2), " + + "month(dt2), " + + "day(ts3), " + + "day(dt3), " + + "hour(ts4), " + + "truncate(10, s)) ()" + + "properties(\"a\"=\"b\")"; + createTable(sql); + Table table = ops.getCatalog().loadTable(tb); + Schema schema = table.schema(); + Assert.assertEquals(9, schema.columns().size()); + PartitionSpec spec = PartitionSpec.builderFor(schema) + .identity("id") + .bucket("id", 2) + .year("ts1") + .year("dt1") + .month("ts2") + .month("dt2") + .day("ts3") + .day("dt3") + .hour("ts4") + .truncate("s", 10) + .build(); + Assert.assertEquals(spec, table.spec()); + Assert.assertEquals("b", table.properties().get("a")); + } + + public void createTable(String sql) throws UserException { + LogicalPlan plan = new NereidsParser().parseSingle(sql); + Assertions.assertTrue(plan instanceof CreateTableCommand); + CreateTableInfo createTableInfo = ((CreateTableCommand) plan).getCreateTableInfo(); + createTableInfo.setIsExternal(true); + CreateTableStmt createTableStmt = createTableInfo.translateToLegacyStmt(); + ops.createTable(createTableStmt); + } + + public String getTableName() { + String s = "test_tb_" + UUID.randomUUID(); + return s.replaceAll("-", ""); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org