BePPPower commented on code in PR #21911:
URL: https://github.com/apache/doris/pull/21911#discussion_r1271810931


##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -266,17 +270,83 @@ private void generateQueryStmt() {
             }
         }
 
-        List<TableRef> tableRefList = Lists.newArrayList();
-        tableRefList.add(this.tableRef);
-        FromClause fromClause = new FromClause(tableRefList);
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
+        for (int i = 0; i < tableRefListPerQuery.size(); i++) {
+            LOG.info("Outfile clause {} is responsible for tables: {}", i,
+                    tableRefListPerQuery.get(i).get(0).getSampleTabletIds());
+        }
+        for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) {
+            FromClause fromClause = new FromClause(tableRefList);
+            // generate outfile clause
+            OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+            SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                    null, null, LimitElement.NO_LIMIT);
+            selectStmt.setOutFileClause(outfile);
+            selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+            selectStmtList.add(selectStmt);
+        }
+    }
+
+    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+        // get tablets
+        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
+        OlapTable table = 
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+        List<Long> tabletIdList = Lists.newArrayList();
+        table.readLock();
+        try {
+            Collection<Partition> partitions = new ArrayList<Partition>();
+            // get partitions
+            // user specifies partitions, already checked in ExportStmt
+            if (partitionNames != null) {
+                for (String partName : partitionNames) {
+                    partitions.add(table.getPartition(partName));
+                }
+            } else {
+                partitions = table.getPartitions();
+            }
+
+            // get tablets
+            for (Partition partition : partitions) {
+                partition.getVisibleVersion();
+                partitionToVersion.put(partition.getName(), 
partition.getVisibleVersion());
+                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
+                    tabletIdList.addAll(index.getTabletIdsInOrder());
+                }
+            }
+        } finally {
+            table.readUnlock();
+        }
+
+        Integer tabletsAllNum = tabletIdList.size();
+        Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum;
+        Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerQuery * this.parallelNum;
 
-        SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
-                null, null, LimitElement.NO_LIMIT);
-        // generate outfile clause
-        OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
-        selectStmt.setOutFileClause(outfile);
-        selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
-        selectStmtList.add(selectStmt);
+        Integer start = 0;
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
Lists.newArrayList();
+
+        int outfileNum = this.parallelNum;
+        if (tabletsAllNum < this.parallelNum) {
+            outfileNum = tabletsAllNum;
+            LOG.warn("The number of tablets is smaller than parallel_num, set 
parallel_num to tablets num.");
+        }
+        for (int i = 0; i < outfileNum; ++i) {
+            Integer tabletsNum = tabletsNumPerQuery;
+            if (tabletsNumPerQueryRemainder > 0) {
+                tabletsNum = tabletsNum + 1;
+                --tabletsNumPerQueryRemainder;
+            }
+            ArrayList<Long> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
+            start += tabletsNum;
+            TableRef tblRef = new TableRef(this.tableRef.getName(), 
this.tableRef.getAlias(),

Review Comment:
   These tablets all come from the partitions which are user specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to