morningman commented on code in PR #22854:
URL: https://github.com/apache/doris/pull/22854#discussion_r1294141076


##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -81,27 +81,21 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
-// NOTE: we must be carefully if we send next request
-//       as soon as receiving one instance's report from one BE,
-//       because we may change job's member concurrently.
+@Data
 public class ExportJob implements Writable {
     private static final Logger LOG = LogManager.getLogger(ExportJob.class);
 
     private static final String BROKER_PROPERTY_PREFIXES = "broker.";
 
-    public enum JobState {
-        PENDING,
-        IN_QUEUE,
-        EXPORTING,
-        FINISHED,
-        CANCELLED,
-    }
+    private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = 
Config.maximum_tablets_of_outfile_in_export;
+
+    public static final TransientTaskRegister register = new 
ExportTaskRegister(

Review Comment:
   Why do we need this `ExportTaskRegister` to wrapper the 
`TransientTaskManager`? 



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -215,55 +220,37 @@ public ExportJob(long jobId) {
         this.id = jobId;
     }
 
-    public void setJob(ExportStmt stmt) throws UserException {
-        String dbName = stmt.getTblName().getDb();
-        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
-        Preconditions.checkNotNull(stmt.getBrokerDesc());
-        this.brokerDesc = stmt.getBrokerDesc();
-        this.columnSeparator = stmt.getColumnSeparator();
-        this.lineDelimiter = stmt.getLineDelimiter();
-        this.label = stmt.getLabel();
-        this.queryId = ConnectContext.get() != null ? 
DebugUtil.printId(ConnectContext.get().queryId()) : "N/A";
-        String path = stmt.getPath();
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
-        this.whereExpr = stmt.getWhereExpr();
-        this.parallelNum = stmt.getParallelNum();
-        this.exportPath = path;
-        this.sessionVariables = stmt.getSessionVariables();
-        this.timeoutSecond = sessionVariables.getQueryTimeoutS();
-
-        this.qualifiedUser = stmt.getQualifiedUser();
-        this.userIdentity = stmt.getUserIdentity();
-        this.format = stmt.getFormat();
-        this.maxFileSize = stmt.getMaxFileSize();
-        this.deleteExistingFiles = stmt.getDeleteExistingFiles();
-        this.partitionNames = stmt.getPartitions();
-
-        this.exportTable = 
db.getTableOrDdlException(stmt.getTblName().getTbl());
-        this.columns = stmt.getColumns();
-        this.tableRef = stmt.getTableRef();
-        if (!Strings.isNullOrEmpty(this.columns)) {
-            Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
-            this.exportColumns = 
split.splitToList(stmt.getColumns().toLowerCase());
-        }
+    /**
+     * For an ExportJob:
+     * The ExportJob is divided into multiple 'ExportTaskExecutor'
+     * according to the 'parallelism' set by the user.
+     * The tablets which will be exported by this ExportJob are divided into 
'parallelism' copies,
+     * and each ExportTaskExecutor is responsible for a list of tablets.
+     * The tablets responsible for an ExportTaskExecutor will be assigned to 
multiple OutfileStmt
+     * according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
+     *
+     * @throws UserException
+     */
+    public void analyze() throws UserException {
         exportTable.readLock();
         try {
-            this.dbId = db.getId();
-            this.tableId = exportTable.getId();
-            this.tableName = stmt.getTblName();
-            if (selectStmtList.isEmpty()) {
-                // This scenario is used for 'EXPORT TABLE tbl INTO PATH'
-                // we need generate Select Statement
-                generateQueryStmt(stmt);
-            }
+            // generateQueryStmtOld
+            generateQueryStmt();
         } finally {
             exportTable.readUnlock();
         }
-        this.sql = stmt.toSql();
-        this.origStmt = stmt.getOrigStmt();
+        generateExportJobExecutor();
+    }
+
+    public void generateExportJobExecutor() {

Review Comment:
   ```suggestion
       private void generateExportJobExecutor() {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -306,30 +302,104 @@ private void generateQueryStmt(ExportStmt stmt) throws 
UserException {
         }
     }
 
-    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+    /**
+     * Generate outfile select stmt
+     * @throws UserException
+     */
+    private void generateQueryStmt() throws UserException {
+        SelectList list = new SelectList();
+        if (exportColumns.isEmpty()) {
+            list.addItem(SelectListItem.createStarItem(this.tableName));
+        } else {
+            for (Column column : exportTable.getBaseSchema()) {
+                String colName = column.getName().toLowerCase();
+                if (exportColumns.contains(colName)) {
+                    SlotRef slotRef = new SlotRef(this.tableName, colName);
+                    SelectListItem selectListItem = new 
SelectListItem(slotRef, null);
+                    list.addItem(selectListItem);
+                }
+            }
+        }
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerParallel = 
getTableRefListPerParallel();
+        LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, 
tableRefListPerParallel.size());
+
+        // debug LOG output
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < tableRefListPerParallel.size(); i++) {
+                LOG.debug("ExportTaskExecutor {} is responsible for tablets:", 
i);
+                for (TableRef tableRef : tableRefListPerParallel.get(i)) {
+                    LOG.debug("Tablet id: [{}]", 
tableRef.getSampleTabletIds());
+                }
+            }
+        }
+
+        // generate 'select..outfile..' statement
+        for (ArrayList<TableRef> tableRefList : tableRefListPerParallel) {

Review Comment:
   There is only one TableRef in `tableRefList`.
   So I think we can only get the first element in `tableRefList`, and remove 
the for loop in line 340.
   Also, for `selectStmtListPerParallel`, change it to `List<SelectStmt>`, not 
`List<List<SelectStmt>>`
   



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+@Getter
+public class ExportJobStateTransfer implements Writable {

Review Comment:
   `Transfer` is verb. How about `ExportJobStateInfo`?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java:
##########
@@ -286,13 +290,17 @@ private void checkTable(Env env) throws AnalysisException 
{
 
     public static String checkPath(String path, StorageBackend.StorageType 
type) throws AnalysisException {
         if (Strings.isNullOrEmpty(path)) {
-            throw new AnalysisException("No dest path specified.");
+            throw new AnalysisException("No destination path specified.");
         }
 
         URI uri = URI.create(path);
         String schema = uri.getScheme();
+        if (schema == null) {
+            throw new AnalysisException(
+                    "Invalid export path, there is no schema of URI found. 
please check your path.");
+        }
         if (type == StorageBackend.StorageType.BROKER) {
-            if (schema == null || (!schema.equalsIgnoreCase("bos")
+            if (!schema.equalsIgnoreCase("bos")

Review Comment:
   This check should be in outfile statement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to