morningman commented on code in PR #36289:
URL: https://github.com/apache/doris/pull/36289#discussion_r1641604020


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java:
##########
@@ -59,140 +66,114 @@ public void 
updateIcebergCommitData(List<TIcebergCommitData> commitDataList) {
         }
     }
 
-    public void beginInsert(String dbName, String tbName) {
-        Table icebergTable = 
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
-        transaction = icebergTable.newTransaction();
+    public void pendingCommit(SimpleTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+        this.transaction = getNativeTable(tableInfo).newTransaction();
     }
 
-    public void finishInsert() {
-        Table icebergTable = transaction.table();
-        AppendFiles appendFiles = transaction.newAppend();
-
-        for (CommitTaskData task : convertToCommitTaskData()) {
-            DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
-                    .withPath(task.getPath())
-                    .withFileSizeInBytes(task.getFileSizeInBytes())
-                    .withFormat(IcebergUtils.getFileFormat(icebergTable))
-                    .withMetrics(task.getMetrics());
-
-            if (icebergTable.spec().isPartitioned()) {
-                List<String> partitionValues = task.getPartitionValues()
-                        .orElseThrow(() -> new VerifyException("No partition 
data for partitioned table"));
-                builder.withPartitionValues(partitionValues);
-            }
-            appendFiles.appendFile(builder.build());
-        }
+    public void preCommit(SimpleTableInfo tableInfo, 
Optional<InsertCommandContext> insertCtx) {
 
-        // in appendFiles.commit, it will generate metadata(manifest and 
snapshot)
-        // after appendFiles.commit, in current transaction, you can already 
see the new snapshot
-        appendFiles.commit();
-    }
+        LOG.info("iceberg table {} insert table finished!", tableInfo);
 
-    public List<CommitTaskData> convertToCommitTaskData() {
-        List<CommitTaskData> commitTaskData = new ArrayList<>();
-        for (TIcebergCommitData data : this.commitDataList) {
-            commitTaskData.add(new CommitTaskData(
-                    data.getFilePath(),
-                    data.getFileSize(),
-                    new Metrics(
-                            data.getRowCount(),
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP
-                    ),
-                    data.isSetPartitionValues() ? 
Optional.of(data.getPartitionValues()) : Optional.empty(),
-                    convertToFileContent(data.getFileContent()),
-                    data.isSetReferencedDataFiles() ? 
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
-            ));
+        //create  start the  iceberg transaction
+        TUpdateMode updateMode = TUpdateMode.APPEND;
+        if (insertCtx.isPresent()) {
+            updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
+                    : TUpdateMode.APPEND;
         }
-        return commitTaskData;
+        updateManifestAfterInsert(updateMode);
     }
 
-    private FileContent convertToFileContent(TFileContent content) {
-        if (content.equals(TFileContent.DATA)) {
-            return FileContent.DATA;
-        } else if (content.equals(TFileContent.POSITION_DELETES)) {
-            return FileContent.POSITION_DELETES;
+    private void updateManifestAfterInsert(TUpdateMode updateMode) {
+
+        Table table = getNativeTable(tableInfo);
+        PartitionSpec spec = table.spec();
+        FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
+        //convert commitDataList to writeResult
+        WriteResult writeResult = IcebergWriterHelper
+                .convertToWriterResult(fileFormat, spec, commitDataList);
+        List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
+
+        if (spec.isPartitioned()) {
+            LOG.info("{} {} table  partition manifest ...", tableInfo, 
updateMode);

Review Comment:
   change this logs to debug, only leave necessary log.
   ```
   if (LOG.isDebugEnable()) {
       LOG.debug xxx
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java:
##########
@@ -59,140 +66,114 @@ public void 
updateIcebergCommitData(List<TIcebergCommitData> commitDataList) {
         }
     }
 
-    public void beginInsert(String dbName, String tbName) {
-        Table icebergTable = 
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
-        transaction = icebergTable.newTransaction();
+    public void pendingCommit(SimpleTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+        this.transaction = getNativeTable(tableInfo).newTransaction();
     }
 
-    public void finishInsert() {
-        Table icebergTable = transaction.table();
-        AppendFiles appendFiles = transaction.newAppend();
-
-        for (CommitTaskData task : convertToCommitTaskData()) {
-            DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
-                    .withPath(task.getPath())
-                    .withFileSizeInBytes(task.getFileSizeInBytes())
-                    .withFormat(IcebergUtils.getFileFormat(icebergTable))
-                    .withMetrics(task.getMetrics());
-
-            if (icebergTable.spec().isPartitioned()) {
-                List<String> partitionValues = task.getPartitionValues()
-                        .orElseThrow(() -> new VerifyException("No partition 
data for partitioned table"));
-                builder.withPartitionValues(partitionValues);
-            }
-            appendFiles.appendFile(builder.build());
-        }
+    public void preCommit(SimpleTableInfo tableInfo, 
Optional<InsertCommandContext> insertCtx) {
 
-        // in appendFiles.commit, it will generate metadata(manifest and 
snapshot)
-        // after appendFiles.commit, in current transaction, you can already 
see the new snapshot
-        appendFiles.commit();
-    }
+        LOG.info("iceberg table {} insert table finished!", tableInfo);
 
-    public List<CommitTaskData> convertToCommitTaskData() {
-        List<CommitTaskData> commitTaskData = new ArrayList<>();
-        for (TIcebergCommitData data : this.commitDataList) {
-            commitTaskData.add(new CommitTaskData(
-                    data.getFilePath(),
-                    data.getFileSize(),
-                    new Metrics(
-                            data.getRowCount(),
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP
-                    ),
-                    data.isSetPartitionValues() ? 
Optional.of(data.getPartitionValues()) : Optional.empty(),
-                    convertToFileContent(data.getFileContent()),
-                    data.isSetReferencedDataFiles() ? 
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
-            ));
+        //create  start the  iceberg transaction

Review Comment:
   ```suggestion
           // create and start the iceberg transaction
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.statistics;
+
+public class CommonStatistics {

Review Comment:
   Add comment for this class.
   What is CommonStatistics and why need it?
   Because I only see it using in Iceberg



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java:
##########
@@ -59,140 +66,114 @@ public void 
updateIcebergCommitData(List<TIcebergCommitData> commitDataList) {
         }
     }
 
-    public void beginInsert(String dbName, String tbName) {
-        Table icebergTable = 
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
-        transaction = icebergTable.newTransaction();
+    public void pendingCommit(SimpleTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+        this.transaction = getNativeTable(tableInfo).newTransaction();
     }
 
-    public void finishInsert() {
-        Table icebergTable = transaction.table();
-        AppendFiles appendFiles = transaction.newAppend();
-
-        for (CommitTaskData task : convertToCommitTaskData()) {
-            DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
-                    .withPath(task.getPath())
-                    .withFileSizeInBytes(task.getFileSizeInBytes())
-                    .withFormat(IcebergUtils.getFileFormat(icebergTable))
-                    .withMetrics(task.getMetrics());
-
-            if (icebergTable.spec().isPartitioned()) {
-                List<String> partitionValues = task.getPartitionValues()
-                        .orElseThrow(() -> new VerifyException("No partition 
data for partitioned table"));
-                builder.withPartitionValues(partitionValues);
-            }
-            appendFiles.appendFile(builder.build());
-        }
+    public void preCommit(SimpleTableInfo tableInfo, 
Optional<InsertCommandContext> insertCtx) {
 
-        // in appendFiles.commit, it will generate metadata(manifest and 
snapshot)
-        // after appendFiles.commit, in current transaction, you can already 
see the new snapshot
-        appendFiles.commit();
-    }
+        LOG.info("iceberg table {} insert table finished!", tableInfo);
 
-    public List<CommitTaskData> convertToCommitTaskData() {
-        List<CommitTaskData> commitTaskData = new ArrayList<>();
-        for (TIcebergCommitData data : this.commitDataList) {
-            commitTaskData.add(new CommitTaskData(
-                    data.getFilePath(),
-                    data.getFileSize(),
-                    new Metrics(
-                            data.getRowCount(),
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP
-                    ),
-                    data.isSetPartitionValues() ? 
Optional.of(data.getPartitionValues()) : Optional.empty(),
-                    convertToFileContent(data.getFileContent()),
-                    data.isSetReferencedDataFiles() ? 
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
-            ));
+        //create  start the  iceberg transaction
+        TUpdateMode updateMode = TUpdateMode.APPEND;
+        if (insertCtx.isPresent()) {
+            updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
+                    : TUpdateMode.APPEND;
         }
-        return commitTaskData;
+        updateManifestAfterInsert(updateMode);
     }
 
-    private FileContent convertToFileContent(TFileContent content) {
-        if (content.equals(TFileContent.DATA)) {
-            return FileContent.DATA;
-        } else if (content.equals(TFileContent.POSITION_DELETES)) {
-            return FileContent.POSITION_DELETES;
+    private void updateManifestAfterInsert(TUpdateMode updateMode) {
+
+        Table table = getNativeTable(tableInfo);
+        PartitionSpec spec = table.spec();
+        FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
+        //convert commitDataList to writeResult
+        WriteResult writeResult = IcebergWriterHelper
+                .convertToWriterResult(fileFormat, spec, commitDataList);
+        List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
+
+        if (spec.isPartitioned()) {
+            LOG.info("{} {} table  partition manifest ...", tableInfo, 
updateMode);
+            partitionManifestUp(updateMode, table, pendingResults);
+            LOG.info("{} {} table partition manifest  successful and 
writeResult : {}..", tableInfo, updateMode,
+                    writeResult);
         } else {
-            return FileContent.EQUALITY_DELETES;
+            LOG.info("{} {} table  manifest ...", tableInfo, updateMode);
+            tableManifestUp(updateMode, table, pendingResults);
+            LOG.info("{} {}  table  manifest  successful and writeResult : 
{}..", tableInfo, updateMode, writeResult);
         }
     }
 
     @Override
     public void commit() throws UserException {
-        // Externally readable
-        // Manipulate the relevant data so that others can also see the latest 
table, such as:
-        //   1. hadoop: it will change the version number information in 
'version-hint.text'
-        //   2. hive: it will change the table properties, the most important 
thing is to revise 'metadata_location'
-        //   3. and so on ...
+        // commit the iceberg transaction
         transaction.commitTransaction();
     }
 
     @Override
     public void rollback() {
-
+        //do nothing
     }
 
-    public long getUpdateCnt() {
-        return 
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
+
+    private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
+        Objects.requireNonNull(tableInfo);
+        IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+        return IcebergUtils.getIcebergTable(externalCatalog, tableInfo);
     }
 
-    public static class CommitTaskData {
-        private final String path;
-        private final long fileSizeInBytes;
-        private final Metrics metrics;
-        private final Optional<List<String>> partitionValues;
-        private final FileContent content;
-        private final Optional<List<String>> referencedDataFiles;
-
-        public CommitTaskData(String path,
-                              long fileSizeInBytes,
-                              Metrics metrics,
-                              Optional<List<String>> partitionValues,
-                              FileContent content,
-                              Optional<List<String>> referencedDataFiles) {
-            this.path = path;
-            this.fileSizeInBytes = fileSizeInBytes;
-            this.metrics = metrics;
-            this.partitionValues = 
convertPartitionValuesForNull(partitionValues);
-            this.content = content;
-            this.referencedDataFiles = referencedDataFiles;
+    private void partitionManifestUp(TUpdateMode updateMode, Table table, 
List<WriteResult> pendingResults) {

Review Comment:
   ```suggestion
       private void partitionManifestUpdate(TUpdateMode updateMode, Table 
table, List<WriteResult> pendingResults) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to