morningman commented on code in PR #36289: URL: https://github.com/apache/doris/pull/36289#discussion_r1641604020
########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java: ########## @@ -59,140 +66,114 @@ public void updateIcebergCommitData(List<TIcebergCommitData> commitDataList) { } } - public void beginInsert(String dbName, String tbName) { - Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); - transaction = icebergTable.newTransaction(); + public void pendingCommit(SimpleTableInfo tableInfo) { + this.tableInfo = tableInfo; + this.transaction = getNativeTable(tableInfo).newTransaction(); } - public void finishInsert() { - Table icebergTable = transaction.table(); - AppendFiles appendFiles = transaction.newAppend(); - - for (CommitTaskData task : convertToCommitTaskData()) { - DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) - .withPath(task.getPath()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withFormat(IcebergUtils.getFileFormat(icebergTable)) - .withMetrics(task.getMetrics()); - - if (icebergTable.spec().isPartitioned()) { - List<String> partitionValues = task.getPartitionValues() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartitionValues(partitionValues); - } - appendFiles.appendFile(builder.build()); - } + public void preCommit(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) { - // in appendFiles.commit, it will generate metadata(manifest and snapshot) - // after appendFiles.commit, in current transaction, you can already see the new snapshot - appendFiles.commit(); - } + LOG.info("iceberg table {} insert table finished!", tableInfo); - public List<CommitTaskData> convertToCommitTaskData() { - List<CommitTaskData> commitTaskData = new ArrayList<>(); - for (TIcebergCommitData data : this.commitDataList) { - commitTaskData.add(new CommitTaskData( - data.getFilePath(), - data.getFileSize(), - new Metrics( - data.getRowCount(), - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP - ), - data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(), - convertToFileContent(data.getFileContent()), - data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() - )); + //create start the iceberg transaction + TUpdateMode updateMode = TUpdateMode.APPEND; + if (insertCtx.isPresent()) { + updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE + : TUpdateMode.APPEND; } - return commitTaskData; + updateManifestAfterInsert(updateMode); } - private FileContent convertToFileContent(TFileContent content) { - if (content.equals(TFileContent.DATA)) { - return FileContent.DATA; - } else if (content.equals(TFileContent.POSITION_DELETES)) { - return FileContent.POSITION_DELETES; + private void updateManifestAfterInsert(TUpdateMode updateMode) { + + Table table = getNativeTable(tableInfo); + PartitionSpec spec = table.spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(table); + + //convert commitDataList to writeResult + WriteResult writeResult = IcebergWriterHelper + .convertToWriterResult(fileFormat, spec, commitDataList); + List<WriteResult> pendingResults = Lists.newArrayList(writeResult); + + if (spec.isPartitioned()) { + LOG.info("{} {} table partition manifest ...", tableInfo, updateMode); Review Comment: change this logs to debug, only leave necessary log. ``` if (LOG.isDebugEnable()) { LOG.debug xxx } ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java: ########## @@ -59,140 +66,114 @@ public void updateIcebergCommitData(List<TIcebergCommitData> commitDataList) { } } - public void beginInsert(String dbName, String tbName) { - Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); - transaction = icebergTable.newTransaction(); + public void pendingCommit(SimpleTableInfo tableInfo) { + this.tableInfo = tableInfo; + this.transaction = getNativeTable(tableInfo).newTransaction(); } - public void finishInsert() { - Table icebergTable = transaction.table(); - AppendFiles appendFiles = transaction.newAppend(); - - for (CommitTaskData task : convertToCommitTaskData()) { - DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) - .withPath(task.getPath()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withFormat(IcebergUtils.getFileFormat(icebergTable)) - .withMetrics(task.getMetrics()); - - if (icebergTable.spec().isPartitioned()) { - List<String> partitionValues = task.getPartitionValues() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartitionValues(partitionValues); - } - appendFiles.appendFile(builder.build()); - } + public void preCommit(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) { - // in appendFiles.commit, it will generate metadata(manifest and snapshot) - // after appendFiles.commit, in current transaction, you can already see the new snapshot - appendFiles.commit(); - } + LOG.info("iceberg table {} insert table finished!", tableInfo); - public List<CommitTaskData> convertToCommitTaskData() { - List<CommitTaskData> commitTaskData = new ArrayList<>(); - for (TIcebergCommitData data : this.commitDataList) { - commitTaskData.add(new CommitTaskData( - data.getFilePath(), - data.getFileSize(), - new Metrics( - data.getRowCount(), - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP - ), - data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(), - convertToFileContent(data.getFileContent()), - data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() - )); + //create start the iceberg transaction Review Comment: ```suggestion // create and start the iceberg transaction ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.statistics; + +public class CommonStatistics { Review Comment: Add comment for this class. What is CommonStatistics and why need it? Because I only see it using in Iceberg ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java: ########## @@ -59,140 +66,114 @@ public void updateIcebergCommitData(List<TIcebergCommitData> commitDataList) { } } - public void beginInsert(String dbName, String tbName) { - Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); - transaction = icebergTable.newTransaction(); + public void pendingCommit(SimpleTableInfo tableInfo) { + this.tableInfo = tableInfo; + this.transaction = getNativeTable(tableInfo).newTransaction(); } - public void finishInsert() { - Table icebergTable = transaction.table(); - AppendFiles appendFiles = transaction.newAppend(); - - for (CommitTaskData task : convertToCommitTaskData()) { - DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) - .withPath(task.getPath()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withFormat(IcebergUtils.getFileFormat(icebergTable)) - .withMetrics(task.getMetrics()); - - if (icebergTable.spec().isPartitioned()) { - List<String> partitionValues = task.getPartitionValues() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartitionValues(partitionValues); - } - appendFiles.appendFile(builder.build()); - } + public void preCommit(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) { - // in appendFiles.commit, it will generate metadata(manifest and snapshot) - // after appendFiles.commit, in current transaction, you can already see the new snapshot - appendFiles.commit(); - } + LOG.info("iceberg table {} insert table finished!", tableInfo); - public List<CommitTaskData> convertToCommitTaskData() { - List<CommitTaskData> commitTaskData = new ArrayList<>(); - for (TIcebergCommitData data : this.commitDataList) { - commitTaskData.add(new CommitTaskData( - data.getFilePath(), - data.getFileSize(), - new Metrics( - data.getRowCount(), - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP - ), - data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(), - convertToFileContent(data.getFileContent()), - data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() - )); + //create start the iceberg transaction + TUpdateMode updateMode = TUpdateMode.APPEND; + if (insertCtx.isPresent()) { + updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE + : TUpdateMode.APPEND; } - return commitTaskData; + updateManifestAfterInsert(updateMode); } - private FileContent convertToFileContent(TFileContent content) { - if (content.equals(TFileContent.DATA)) { - return FileContent.DATA; - } else if (content.equals(TFileContent.POSITION_DELETES)) { - return FileContent.POSITION_DELETES; + private void updateManifestAfterInsert(TUpdateMode updateMode) { + + Table table = getNativeTable(tableInfo); + PartitionSpec spec = table.spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(table); + + //convert commitDataList to writeResult + WriteResult writeResult = IcebergWriterHelper + .convertToWriterResult(fileFormat, spec, commitDataList); + List<WriteResult> pendingResults = Lists.newArrayList(writeResult); + + if (spec.isPartitioned()) { + LOG.info("{} {} table partition manifest ...", tableInfo, updateMode); + partitionManifestUp(updateMode, table, pendingResults); + LOG.info("{} {} table partition manifest successful and writeResult : {}..", tableInfo, updateMode, + writeResult); } else { - return FileContent.EQUALITY_DELETES; + LOG.info("{} {} table manifest ...", tableInfo, updateMode); + tableManifestUp(updateMode, table, pendingResults); + LOG.info("{} {} table manifest successful and writeResult : {}..", tableInfo, updateMode, writeResult); } } @Override public void commit() throws UserException { - // Externally readable - // Manipulate the relevant data so that others can also see the latest table, such as: - // 1. hadoop: it will change the version number information in 'version-hint.text' - // 2. hive: it will change the table properties, the most important thing is to revise 'metadata_location' - // 3. and so on ... + // commit the iceberg transaction transaction.commitTransaction(); } @Override public void rollback() { - + //do nothing } - public long getUpdateCnt() { - return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); + + private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { + Objects.requireNonNull(tableInfo); + IcebergExternalCatalog externalCatalog = ops.getExternalCatalog(); + return IcebergUtils.getIcebergTable(externalCatalog, tableInfo); } - public static class CommitTaskData { - private final String path; - private final long fileSizeInBytes; - private final Metrics metrics; - private final Optional<List<String>> partitionValues; - private final FileContent content; - private final Optional<List<String>> referencedDataFiles; - - public CommitTaskData(String path, - long fileSizeInBytes, - Metrics metrics, - Optional<List<String>> partitionValues, - FileContent content, - Optional<List<String>> referencedDataFiles) { - this.path = path; - this.fileSizeInBytes = fileSizeInBytes; - this.metrics = metrics; - this.partitionValues = convertPartitionValuesForNull(partitionValues); - this.content = content; - this.referencedDataFiles = referencedDataFiles; + private void partitionManifestUp(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { Review Comment: ```suggestion private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org