morningman commented on code in PR #19170: URL: https://github.com/apache/doris/pull/19170#discussion_r1181664051
########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); Review Comment: ```suggestion private static final Logger LOG = LogManager.getLogger(ProgressManager.class); ``` ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + + private Map<Long, Progress> idToProgress = Maps.newConcurrentMap(); + + public void registerProgress(Long id, int scannerNum) { + LOG.info("create " + id + " with initial scannerNum " + scannerNum); Review Comment: ```suggestion LOG.debug("create {} with initial scannerNum {}", id, scannerNum); ``` ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + + private Map<Long, Progress> idToProgress = Maps.newConcurrentMap(); + + public void registerProgress(Long id, int scannerNum) { + LOG.info("create " + id + " with initial scannerNum " + scannerNum); + idToProgress.remove(id); + idToProgress.put(id, new Progress(scannerNum)); + } + + public void registerProgressSimple(Long id) { + registerProgress(id, 0); + } + + public void updateProgress(Long id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum); + } else { + LOG.warn("progress[" + id + "] missing meta infomartion"); + } + } + + public void addTotalScanNums(Long id, int num) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.addTotalScanNums(num); + } + } + + public void deregisterProgress(Long id) { + // TODO: deregister the progress + idToProgress.remove(id); + } + + public Progress getProgressClass(Long id) { + return idToProgress.get(id); + } + + public double getProgress(Long id) { + return idToProgress.get(id).getProgress(); + } + + static class Progress { + private Table<TUniqueId, TUniqueId, Integer> finishedScanNums = HashBasedTable.create(); + private int totalScanNums = 0; + + public synchronized void addTotalScanNums(int num) { + totalScanNums += num; + } + + public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) { + if (finishedScanNums.contains(queryId, fragmentId)) { Review Comment: The `if` and `else` is same. ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java: ########## @@ -760,18 +760,27 @@ public List<Comparable> getShowInfo() throws DdlException { jobInfo.add(state.name()); // progress + // check null + ProgressManager.Progress progressShow = Env.getCurrentProgressManager().getProgressClass(id); + String progress = "ERROR"; + if (progressShow != null) { + int finish = progressShow.getFinishedScanNums(); + int total = progressShow.getTotalScanNums(); + String currentProgress = String.format("%.2f", progressShow.getProgress()); + progress = currentProgress + "% (" + finish + "/" + total + ")"; + } switch (state) { case PENDING: - jobInfo.add("ETL:0%; LOAD:0%"); + jobInfo.add("INITIALING"); Review Comment: ```suggestion jobInfo.add("0%"); ``` ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java: ########## @@ -760,18 +760,27 @@ public List<Comparable> getShowInfo() throws DdlException { jobInfo.add(state.name()); // progress + // check null + ProgressManager.Progress progressShow = Env.getCurrentProgressManager().getProgressClass(id); + String progress = "ERROR"; + if (progressShow != null) { Review Comment: Please extract a method for this logic and move it to `ProgressManager.Progress` ########## fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java: ########## @@ -2115,9 +2127,37 @@ private void computeScanRangeAssignmentByScheduler( // Volume is optional, so we need to set the value and the is-set bit scanRangeParams.setVolumeId(minLocation.volume_id); scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); } } + private void updateScanRangeNumByScanRange(TScanRangeParams param) { + TScanRange scanRange = param.getScanRange(); + if (scanRange == null) { + LOG.warn("null scan range"); Review Comment: useless log, remove it or make it meanningful ########## be/src/runtime/fragment_mgr.cpp: ########## @@ -394,10 +394,12 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { // this is a load plan, and load is not finished, just make a brief report params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); + params.__set_finished_scan_ranges(req.runtime_state->num_finished_range()); Review Comment: I think we can call this no matter what type it is. ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + + private Map<Long, Progress> idToProgress = Maps.newConcurrentMap(); + + public void registerProgress(Long id, int scannerNum) { + LOG.info("create " + id + " with initial scannerNum " + scannerNum); + idToProgress.remove(id); + idToProgress.put(id, new Progress(scannerNum)); + } + + public void registerProgressSimple(Long id) { + registerProgress(id, 0); + } + + public void updateProgress(Long id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum); + } else { + LOG.warn("progress[" + id + "] missing meta infomartion"); + } + } + + public void addTotalScanNums(Long id, int num) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.addTotalScanNums(num); + } + } + + public void deregisterProgress(Long id) { + // TODO: deregister the progress + idToProgress.remove(id); + } + + public Progress getProgressClass(Long id) { + return idToProgress.get(id); + } + + public double getProgress(Long id) { + return idToProgress.get(id).getProgress(); + } + + static class Progress { + private Table<TUniqueId, TUniqueId, Integer> finishedScanNums = HashBasedTable.create(); Review Comment: Add comment to explain the `rowkey`, `columnkey` and `value` in `finishedScanNums` ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + + private Map<Long, Progress> idToProgress = Maps.newConcurrentMap(); Review Comment: better use `String` as the map's key. Because not all job have `Integer` type id. ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * ProgressManager manage the progress of loading and exporting tasks + */ +public class ProgressManager { + private static final Logger LOG = LogManager.getLogger(LoadManager.class); + + private Map<Long, Progress> idToProgress = Maps.newConcurrentMap(); + + public void registerProgress(Long id, int scannerNum) { + LOG.info("create " + id + " with initial scannerNum " + scannerNum); + idToProgress.remove(id); + idToProgress.put(id, new Progress(scannerNum)); + } + + public void registerProgressSimple(Long id) { + registerProgress(id, 0); + } + + public void updateProgress(Long id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum); + } else { + LOG.warn("progress[" + id + "] missing meta infomartion"); + } + } + + public void addTotalScanNums(Long id, int num) { + Progress progress = idToProgress.get(id); + if (progress != null) { + progress.addTotalScanNums(num); + } + } + + public void deregisterProgress(Long id) { + // TODO: deregister the progress + idToProgress.remove(id); + } + + public Progress getProgressClass(Long id) { + return idToProgress.get(id); + } + + public double getProgress(Long id) { + return idToProgress.get(id).getProgress(); + } + + static class Progress { + private Table<TUniqueId, TUniqueId, Integer> finishedScanNums = HashBasedTable.create(); + private int totalScanNums = 0; + + public synchronized void addTotalScanNums(int num) { + totalScanNums += num; + } + + public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) { + if (finishedScanNums.contains(queryId, fragmentId)) { + finishedScanNums.put(queryId, fragmentId, finishedScanNum); + } else { + finishedScanNums.put(queryId, fragmentId, finishedScanNum); + } + } + + public int getTotalScanNums() { + return totalScanNums; + } + + public int getFinishedScanNums() { + int result = 0; + for (Integer v : finishedScanNums.values()) { + result += v; + } + return result; + } + + public double getProgress() { + return getFinishedScanNums() * 100 / (double) totalScanNums; Review Comment: Handle `divide by zero` exception ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java: ########## @@ -182,6 +182,8 @@ private void actualExecute(Coordinator curCoordinator) throws Exception { curCoordinator.getErrorTabletInfos().clear(); // Create profile of this task and add to the job profile. createProfile(curCoordinator); + // Register current load job to ProgressManager + // progressManager.registerProgress(curCoordinator.getQueryId(), curCoordinator.getScanRangeNum()); Review Comment: Remove unused code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org