morningman commented on code in PR #34257: URL: https://github.com/apache/doris/pull/34257#discussion_r1602988760
########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java: ########## @@ -576,12 +585,52 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St } public static String getFileFormat(Table table) { - Snapshot snapshot = table.currentSnapshot(); - if (snapshot == null) { - return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - } else { - return snapshot.summary().getOrDefault( - TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + Map<String, String> properties = table.properties(); + if (properties.containsKey(WRITE_FORMAT)) { + return properties.get(WRITE_FORMAT); + } + if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) { + return properties.get(TableProperties.DEFAULT_FILE_FORMAT); + } + return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + } + + public static String getFileCompress(Table table) { + Map<String, String> properties = table.properties(); + if (properties.containsKey(COMPRESSION_CODEC)) { + return properties.get(COMPRESSION_CODEC); + } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) { + return properties.get(SPARK_SQL_COMPRESSION_CODEC); + } + String fileFormat = getFileFormat(table); + if (fileFormat.equalsIgnoreCase("parquet")) { + return properties.getOrDefault( + TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + } else if (fileFormat.equalsIgnoreCase("orc")) { + return properties.getOrDefault( + TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT); + } else if (fileFormat.equalsIgnoreCase("avro")) { Review Comment: We are not support avro yet ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -83,4 +90,17 @@ public long fetchRowCount() { makeSureInitialized(); return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); } + + public Table getIcebergTable() { Review Comment: use `IcebergUtils.getIcebergTable` directly, it will get iceberg table from cache. No need to add a table reference here. ########## fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java: ########## @@ -50,16 +49,25 @@ import java.util.Set; import java.util.stream.Collectors; -public class HiveTableSink extends DataSink { +public class HiveTableSink extends BaseExternalTableDataSink { - private HMSExternalTable targetTable; - protected TDataSink tDataSink; + private final HMSExternalTable targetTable; + private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{ + add(TFileFormatType.FORMAT_ORC); + add(TFileFormatType.FORMAT_PARQUET); + add(TFileFormatType.FORMAT_CSV_PLAIN); Review Comment: FORMAT_CSV_PLAIN is not supported yet ########## fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java: ########## @@ -72,6 +76,9 @@ public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> na } else if (curCatalog instanceof HMSExternalCatalog) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof IcebergExternalCatalog) { Review Comment: I find another issue that above, if the curCatalog is HMSExternalCatalog, it will create a `UnboundHiveTableSink`. But in HMSCatalog, there may be hudi and iceberg table. So I think we should check table's type too? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java: ########## @@ -101,6 +108,9 @@ public static LogicalSink<? extends Plan> createUnboundTableSinkMaybeOverwrite(L } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) { Review Comment: ditto ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java: ########## @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.BaseExternalTableDataSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.transaction.TransactionManager; +import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionType; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for base external table + */ +public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class); + private static final long INVALID_TXN_ID = -1L; + protected long txnId = INVALID_TXN_ID; + protected TransactionStatus txnStatus = TransactionStatus.ABORTED; + protected final TransactionManager transactionManager; + protected final String catalogName; + protected Optional<SummaryProfile> summaryProfile = Optional.empty(); + + /** + * constructor + */ + public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table, + String labelName, NereidsPlanner planner, + Optional<InsertCommandContext> insertCtx, + boolean emptyInsert) { + super(ctx, table, labelName, planner, insertCtx, emptyInsert); + catalogName = table.getCatalog().getName(); + transactionManager = table.getCatalog().getTransactionManager(); + + if (ConnectContext.get().getExecutor() != null) { + summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile()); + } + } + + public long getTxnId() { + return txnId; + } + + /** + * collect commit infos from BEs + */ + public abstract void setCollectCommitInfoFunc(); + + @Override + public void beginTransaction() { + txnId = transactionManager.begin(); + setCollectCommitInfoFunc(); + } + + /** + * At this time, FE has successfully collected all commit information from BEs. + * Before commit this txn, commit information need to be analyzed and processed. + */ + protected abstract void doBeforeCommit() throws UserException; Review Comment: Better put all `abstract` method together -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org