gnehil commented on code in PR #36862: URL: https://github.com/apache/doris/pull/36862#discussion_r1714746863
########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/IngestionLoadJob.java: ########## @@ -0,0 +1,1138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DataQualityException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.QuotaExceedException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.MetaLockUtils; +import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.EtlStatus; +import org.apache.doris.load.FailMsg; +import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.sparkdpp.DppResult; +import org.apache.doris.sparkdpp.EtlJobConfig; +import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTaskExecutor; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.task.PushTask; +import org.apache.doris.thrift.TBrokerRangeDesc; +import org.apache.doris.thrift.TBrokerScanRange; +import org.apache.doris.thrift.TBrokerScanRangeParams; +import org.apache.doris.thrift.TColumn; +import org.apache.doris.thrift.TDescriptorTable; +import org.apache.doris.thrift.TEtlState; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPriority; +import org.apache.doris.thrift.TPushType; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TabletQuorumFailedException; +import org.apache.doris.transaction.TransactionState; + +import cfjd.com.google.gson.annotations.SerializedName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import lombok.Setter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Ingestion Load + * </p> + * Load data file which has been pre-processed + * </p> + * There are 4 steps in IngestionLoadJob: + * Step1: Outside system execute ingestion etl job. + * Step2: LoadEtlChecker will check ingestion etl job status periodically + * and send push tasks to be when ingestion etl job is finished. + * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished. + * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction. + */ +public class IngestionLoadJob extends LoadJob { + + public static final Logger LOG = LogManager.getLogger(IngestionLoadJob.class); + + private long etlStartTimestamp = -1; + + private long quorumFinishTimestamp = -1; + + private List<Long> loadTableIds = new ArrayList<>(); + + @Setter + @SerializedName("ests") + private EtlStatus etlStatus; + + private final Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap(); + + private final Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = Maps.newHashMap(); + + // members below updated when job state changed to loading + // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) } + @SerializedName(value = "tm2fi") + private final Map<String, Pair<String, Long>> tabletMetaToFileInfo = Maps.newHashMap(); + + private final Map<Long, Integer> indexToSchemaHash = Maps.newHashMap(); + + private final Map<String, Long> filePathToSize = new HashMap<>(); + + private final Set<Long> finishedReplicas = Sets.newHashSet(); + private final Set<Long> quorumTablets = Sets.newHashSet(); + private final Set<Long> fullTablets = Sets.newHashSet(); + + private final List<TabletCommitInfo> commitInfos = Lists.newArrayList(); + + @SerializedName(value = "hp") + private final Map<String, String> hadoopProperties = new HashMap<>(); + + @SerializedName(value = "i2sv") + private final Map<Long, Integer> indexToSchemaVersion = new HashMap<>(); + + public IngestionLoadJob() { + super(EtlJobType.INGESTION); + } + + public IngestionLoadJob(long dbId, String label, List<String> tableNames, UserIdentity userInfo) + throws LoadException { + super(EtlJobType.INGESTION, dbId, label); + this.loadTableIds = getLoadTableIds(tableNames); + this.userInfo = userInfo; + } + + @Override + public Set<String> getTableNamesForShow() { + return Collections.emptySet(); + } + + @Override + public Set<String> getTableNames() throws MetaNotFoundException { + Set<String> result = Sets.newHashSet(); + Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); + for (long tableId : loadTableIds) { + Table table = database.getTableOrMetaException(tableId); + result.add(table.getName()); + } + return result; + } + + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + super.afterVisible(txnState, txnOperated); + clearJob(); + } + + @Override + public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) + throws UserException { + super.afterAborted(txnState, txnOperated, txnStatusChangeReason); + clearJob(); + } + + @Override + public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) { + super.cancelJobWithoutCheck(failMsg, abortTxn, needLog); + clearJob(); + } + + @Override + public void cancelJob(FailMsg failMsg) throws DdlException { + super.cancelJob(failMsg); + clearJob(); + } + + public List<Long> getLoadTableIds(List<String> tableNames) throws LoadException { + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s)); + List<Long> list = new ArrayList<>(tableNames.size()); + for (String tableName : tableNames) { + OlapTable olapTable = (OlapTable) db.getTableOrException(tableName, + s -> new LoadException("table does not exist. id: " + s)); + list.add(olapTable.getId()); + } + return list; + } + + protected long getEtlStartTimestamp() { + return etlStartTimestamp; + } + + public long beginTransaction() + throws BeginTransactionException, MetaNotFoundException, AnalysisException, QuotaExceedException, + LabelAlreadyUsedException, DuplicatedRequestException { + this.transactionId = Env.getCurrentGlobalTransactionMgr() + .beginTransaction(dbId, loadTableIds, label, null, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()), + TransactionState.LoadJobSourceType.FRONTEND, id, getTimeout()); + return transactionId; + } + + public Map<String, Object> getLoadMeta(Map<String, List<String>> tableToPartitionMap) + throws LoadException { + + if (tableToPartitionMap == null || tableToPartitionMap.isEmpty()) { + throw new IllegalArgumentException("tableToPartitionMap is empty"); + } + + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s)); + Map<String, Object> loadMeta = new HashMap<>(); + loadMeta.put("dbId", db.getId()); + Long signature = Env.getCurrentEnv().getNextId(); + loadMeta.put("signature", signature); + + List<Table> tables; + try { + tables = db.getTablesOnIdOrderOrThrowException(loadTableIds); + } catch (MetaNotFoundException e) { + throw new LoadException(e.getMessage()); + } + + MetaLockUtils.readLockTables(tables); + try { + Map<String, Map<String, Object>> tableMeta = new HashMap<>(tableToPartitionMap.size()); + for (Map.Entry<String, List<String>> entry : tableToPartitionMap.entrySet()) { + String tableName = entry.getKey(); + Map<String, Object> meta = tableMeta.getOrDefault(tableName, new HashMap<>()); + OlapTable olapTable = (OlapTable) db.getTableOrException(tableName, + s -> new LoadException("table does not exist. id: " + s)); + meta.put("id", olapTable.getId()); + List<EtlJobConfig.EtlIndex> indices = createEtlIndexes(olapTable); + meta.put("indexes", indices); + List<String> partitionNames = entry.getValue(); + Set<Long> partitionIds; + if (partitionNames != null && !partitionNames.isEmpty()) { + partitionIds = new HashSet<>(partitionNames.size()); + for (String partitionName : partitionNames) { + Partition partition = olapTable.getPartition(partitionName); + if (partition == null) { + throw new LoadException(String.format("partition %s is not exists", partitionName)); + } + partitionIds.add(partition.getId()); + } + } else { + partitionIds = + olapTable.getAllPartitions().stream().map(Partition::getId).collect(Collectors.toSet()); + } + EtlJobConfig.EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(olapTable, partitionIds); + meta.put("partitionInfo", etlPartitionInfo); + tableMeta.put(tableName, meta); + + if (tableToLoadPartitions.containsKey(olapTable.getId())) { + tableToLoadPartitions.get(olapTable.getId()).addAll(partitionIds); + } else { + tableToLoadPartitions.put(olapTable.getId(), partitionIds); + } + + } + loadMeta.put("tableMeta", tableMeta); + } finally { + MetaLockUtils.readUnlockTables(tables); + } + return loadMeta; + + } + + private List<EtlJobConfig.EtlIndex> createEtlIndexes(OlapTable table) throws LoadException { + List<EtlJobConfig.EtlIndex> etlIndexes = Lists.newArrayList(); + + for (Map.Entry<Long, List<Column>> entry : table.getIndexIdToSchema().entrySet()) { + long indexId = entry.getKey(); + // todo(liheng): get schema hash and version from materialized index meta directly + MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(indexId); + int schemaHash = indexMeta.getSchemaHash(); + int schemaVersion = indexMeta.getSchemaVersion(); + + boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS) + && table.getTableProperty().getEnableUniqueKeyMergeOnWrite(); + + // columns + List<EtlJobConfig.EtlColumn> etlColumns = Lists.newArrayList(); + for (Column column : entry.getValue()) { + etlColumns.add(createEtlColumn(column, changeAggType)); + } + + // check distribution type + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + if (distributionInfo.getType() != DistributionInfo.DistributionInfoType.HASH) { + // RANDOM not supported + String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name(); + LOG.warn(errMsg); + throw new LoadException(errMsg); + } + + // index type + String indexType; + KeysType keysType = table.getKeysTypeByIndexId(indexId); + switch (keysType) { + case DUP_KEYS: + indexType = "DUPLICATE"; + break; + case AGG_KEYS: + indexType = "AGGREGATE"; + break; + case UNIQUE_KEYS: + indexType = "UNIQUE"; + break; + default: + String errMsg = "unknown keys type. type: " + keysType.name(); + LOG.warn(errMsg); + throw new LoadException(errMsg); + } + + indexToSchemaVersion.put(indexId, schemaVersion); + + etlIndexes.add(new EtlJobConfig.EtlIndex(indexId, etlColumns, schemaHash, indexType, + indexId == table.getBaseIndexId(), schemaVersion)); + } + + return etlIndexes; + } + + private EtlJobConfig.EtlColumn createEtlColumn(Column column, boolean changeAggType) { + // column name + String name = column.getName().toLowerCase(Locale.ROOT); + // column type + PrimitiveType type = column.getDataType(); + String columnType = column.getDataType().toString(); + // is allow null + boolean isAllowNull = column.isAllowNull(); + // is key + boolean isKey = column.isKey(); + + // aggregation type + String aggregationType = null; + if (column.getAggregationType() != null) { + if (changeAggType && !column.isKey()) { + aggregationType = AggregateType.REPLACE.toSql(); + } else { + aggregationType = column.getAggregationType().toString(); + } + } + + // default value + String defaultValue = null; + if (column.getDefaultValue() != null) { + defaultValue = column.getDefaultValue(); + } + if (column.isAllowNull() && column.getDefaultValue() == null) { + defaultValue = "\\N"; + } + + // string length + int stringLength = 0; + if (type.isStringType()) { + stringLength = column.getStrLen(); + } + + // decimal precision scale + int precision = 0; + int scale = 0; + if (type.isDecimalV2Type() || type.isDecimalV3Type()) { + precision = column.getPrecision(); + scale = column.getScale(); + } + + return new EtlJobConfig.EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue, + stringLength, precision, scale); + } + + private EtlJobConfig.EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set<Long> partitionIds) + throws LoadException { + PartitionType type = table.getPartitionInfo().getType(); + + List<String> partitionColumnRefs = Lists.newArrayList(); + List<EtlJobConfig.EtlPartition> etlPartitions = Lists.newArrayList(); + if (type == PartitionType.RANGE) { + RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo(); + for (Column column : rangePartitionInfo.getPartitionColumns()) { + partitionColumnRefs.add(column.getName()); + } + + for (Map.Entry<Long, PartitionItem> entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) { + long partitionId = entry.getKey(); + if (!partitionIds.contains(partitionId)) { + continue; + } + + Partition partition = table.getPartition(partitionId); + if (partition == null) { + throw new LoadException("partition does not exist. id: " + partitionId); + } + + // bucket num + int bucketNum = partition.getDistributionInfo().getBucketNum(); + + // is max partition + Range<PartitionKey> range = entry.getValue().getItems(); + boolean isMaxPartition = range.upperEndpoint().isMaxValue(); + + // start keys + List<LiteralExpr> rangeKeyExprs = range.lowerEndpoint().getKeys(); + List<Object> startKeys = Lists.newArrayList(); + for (LiteralExpr literalExpr : rangeKeyExprs) { + Object keyValue = literalExpr.getRealValue(); + startKeys.add(keyValue); + } + + // end keys + // is empty list when max partition + List<Object> endKeys = Lists.newArrayList(); + if (!isMaxPartition) { + rangeKeyExprs = range.upperEndpoint().getKeys(); + for (LiteralExpr literalExpr : rangeKeyExprs) { + Object keyValue = literalExpr.getRealValue(); + endKeys.add(keyValue); + } + } + + etlPartitions.add( + new EtlJobConfig.EtlPartition(partitionId, startKeys, endKeys, isMaxPartition, bucketNum)); + } + } else if (type == PartitionType.UNPARTITIONED) { + Preconditions.checkState(partitionIds.size() == 1, "partition size must be eqauls to 1"); + + for (Long partitionId : partitionIds) { + Partition partition = table.getPartition(partitionId); + if (partition == null) { + throw new LoadException("partition does not exist. id: " + partitionId); + } + + // bucket num + int bucketNum = partition.getDistributionInfo().getBucketNum(); + + etlPartitions.add(new EtlJobConfig.EtlPartition(partitionId, Lists.newArrayList(), Lists.newArrayList(), + true, bucketNum)); + } + } else { + throw new LoadException("Spark Load does not support list partition yet"); + } + + // distribution column refs + List<String> distributionColumnRefs = Lists.newArrayList(); + DistributionInfo distributionInfo = table.getDefaultDistributionInfo(); + Preconditions.checkState(distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH); + for (Column column : ((HashDistributionInfo) distributionInfo).getDistributionColumns()) { + distributionColumnRefs.add(column.getName()); + } + + return new EtlJobConfig.EtlPartitionInfo(type.typeString, partitionColumnRefs, distributionColumnRefs, + etlPartitions); + } + + public void updateEtlStatus() throws Exception { + + if (!checkState(JobState.ETL) || etlStatus == null) { + return; + } + + writeLock(); + try { + switch (etlStatus.getState()) { + case FINISHED: + unprotectedProcessEtlFinish(); + break; + case CANCELLED: + throw new LoadException("spark etl job failed. msg: " + etlStatus.getFailMsg()); + default: + break; + } + } finally { + writeUnlock(); + } + + if (checkState(JobState.LOADING)) { + submitPushTasks(); + } + + } + + private boolean checkState(JobState expectState) { + readLock(); + try { + return state == expectState; + } finally { + readUnlock(); + } + } + + private Set<Long> submitPushTasks() throws UserException { + + // check db exist + Database db = null; + try { + db = getDb(); + } catch (MetaNotFoundException e) { + String errMsg = new LogBuilder(LogKey.LOAD_JOB, id).add("database_id", dbId).add("label", label) + .add("error_msg", "db has been deleted when job is loading").build(); + throw new MetaNotFoundException(errMsg); + } + + AgentBatchTask batchTask = new AgentBatchTask(); + boolean hasLoadPartitions = false; + Set<Long> totalTablets = Sets.newHashSet(); + List<? extends TableIf> tableList = db.getTablesOnIdOrderOrThrowException( + Lists.newArrayList(tableToLoadPartitions.keySet())); + MetaLockUtils.readLockTables(tableList); + try { + writeLock(); + try { + // check state is still loading. If state is cancelled or finished, return. + // if state is cancelled or finished and not return, + // this would throw all partitions have no load data exception, + // because tableToLoadPartitions was already cleaned up, + if (state != JobState.LOADING) { + LOG.warn("job state is not loading. job id: {}, state: {}", id, state); + return totalTablets; + } + + for (TableIf table : tableList) { + Set<Long> partitionIds = tableToLoadPartitions.get(table.getId()); + OlapTable olapTable = (OlapTable) table; + for (long partitionId : partitionIds) { + Partition partition = olapTable.getPartition(partitionId); + if (partition == null) { + LOG.warn("partition does not exist. id: {}", partitionId); + continue; + } + + hasLoadPartitions = true; + int quorumReplicaNum = + olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum() / 2 + + 1; + + List<MaterializedIndex> indexes = partition.getMaterializedIndices( + MaterializedIndex.IndexExtState.ALL); + for (MaterializedIndex index : indexes) { + long indexId = index.getId(); + MaterializedIndexMeta indexMeta = olapTable.getIndexMetaByIndexId(indexId); + int schemaVersion = indexMeta.getSchemaVersion(); + int schemaHash = indexMeta.getSchemaHash(); + + // check schemaHash and schemaVersion whether is changed + checkIndexSchema(indexId, schemaHash, schemaVersion); + + int bucket = 0; + for (Tablet tablet : index.getTablets()) { + long tabletId = tablet.getId(); + totalTablets.add(tabletId); + Set<Long> tabletAllReplicas = Sets.newHashSet(); + Set<Long> tabletFinishedReplicas = Sets.newHashSet(); + for (Replica replica : tablet.getReplicas()) { + long replicaId = replica.getId(); + tabletAllReplicas.add(replicaId); + if (!tabletToSentReplicaPushTask.containsKey(tabletId) + || !tabletToSentReplicaPushTask.get(tabletId).containsKey(replicaId)) { + long backendId = replica.getBackendId(); + long taskSignature = Env.getCurrentGlobalTransactionMgr() + .getNextTransactionId(); + + PushTask pushTask = + buildPushTask(backendId, olapTable, taskSignature, partitionId, indexId, + tabletId, replicaId, schemaHash, schemaVersion, bucket++); + if (AgentTaskQueue.addTask(pushTask)) { + batchTask.addTask(pushTask); + if (!tabletToSentReplicaPushTask.containsKey(tabletId)) { + tabletToSentReplicaPushTask.put(tabletId, Maps.newHashMap()); + } + tabletToSentReplicaPushTask.get(tabletId).put(replicaId, pushTask); + } + } + + if (finishedReplicas.contains(replicaId) && replica.getLastFailedVersion() < 0) { + tabletFinishedReplicas.add(replicaId); + } + } + + if (tabletAllReplicas.isEmpty()) { + LOG.error("invalid situation. tablet is empty. id: {}", tabletId); + } + + // check tablet push states + if (tabletFinishedReplicas.size() >= quorumReplicaNum) { + quorumTablets.add(tabletId); + if (tabletFinishedReplicas.size() == tabletAllReplicas.size()) { + fullTablets.add(tabletId); + } + } + } + } + } + } + + if (batchTask.getTaskNum() > 0) { + AgentTaskExecutor.submit(batchTask); + } + + if (!hasLoadPartitions) { + String errMsg = new LogBuilder(LogKey.LOAD_JOB, id).add("database_id", dbId).add("label", label) + .add("error_msg", "all partitions have no load data").build(); + throw new LoadException(errMsg); + } + + return totalTablets; + } finally { + writeUnlock(); + } + } finally { + MetaLockUtils.readUnlockTables(tableList); + } + + } + + public void updateJobStatus(Map<String, String> statusInfo) { + + updateState(statusInfo.get("status"), statusInfo.get("msg")); + + etlStatus.setTrackingUrl(statusInfo.get("appId")); + etlStatus.setProgress(progress); + + if (etlStatus.getState() == TEtlState.FINISHED) { + Gson gson = new Gson(); + DppResult dppResult = gson.fromJson(statusInfo.get("dppResult"), DppResult.class); + loadStatistic.fileNum = (int) dppResult.fileNumber; + loadStatistic.totalFileSizeB = dppResult.fileSize; + TUniqueId dummyId = new TUniqueId(0, 0); + long dummyBackendId = -1L; + loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), Lists.newArrayList(dummyBackendId)); + loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows, + dppResult.scannedBytes, true); + loadingStatus.setDppResult(dppResult); + Map<String, String> counters = loadingStatus.getCounters(); + counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows)); + counters.put(DPP_ABNORMAL_ALL, String.valueOf(dppResult.abnormalRows)); + counters.put(UNSELECTED_ROWS, String.valueOf(dppResult.unselectRows)); + filePathToSize.putAll( + gson.fromJson(statusInfo.get("filePathToSize"), new TypeToken<HashMap<String, Long>>() { + })); + hadoopProperties.putAll( + gson.fromJson(statusInfo.get("hadoopProperties"), new TypeToken<HashMap<String, String>>() { + })); + } + + } + + private void updateState(String stateStr, String msg) { + + switch (stateStr.toLowerCase()) { + case "running": + etlStatus.setState(TEtlState.RUNNING); + break; + case "success": + etlStatus.setState(TEtlState.FINISHED); + break; + case "failed": + boolean res = etlStatus.setState(TEtlState.CANCELLED); + if (!res) { + etlStatus = new EtlStatus(); + etlStatus.setState(TEtlState.CANCELLED); + } + etlStatus.setFailMsg(msg); + break; + default: + etlStatus.setState(TEtlState.UNKNOWN); + break; + } + + } + + public void startEtlJob() { + etlStartTimestamp = System.currentTimeMillis(); + state = JobState.ETL; + etlStatus = new EtlStatus(); + } + + private void unprotectedUpdateToLoadingState(EtlStatus etlStatus, Map<String, Long> filePathToSize) + throws LoadException { + try { + for (Map.Entry<String, Long> entry : filePathToSize.entrySet()) { + String filePath = entry.getKey(); + if (!filePath.endsWith(EtlJobConfig.ETL_OUTPUT_FILE_FORMAT)) { + continue; + } + String tabletMetaStr = EtlJobConfig.getTabletMetaStr(filePath); + tabletMetaToFileInfo.put(tabletMetaStr, Pair.of(filePath, entry.getValue())); + } + + loadingStatus = etlStatus; + progress = 0; + Env.getCurrentProgressManager().registerProgressSimple(String.valueOf(id)); + unprotectedUpdateState(JobState.LOADING); + LOG.info("update to {} state success. job id: {}", state, id); + } catch (Exception e) { + LOG.warn("update to {} state failed. job id: {}", state, id, e); + throw new LoadException(e.getMessage(), e); + } + } + + private void unprotectedPrepareLoadingInfos() { + for (String tabletMetaStr : tabletMetaToFileInfo.keySet()) { + String[] fileNameArr = tabletMetaStr.split("\\."); + // tableId.partitionId.indexId.bucket.schemaHash + Preconditions.checkState(fileNameArr.length == 5); + long tableId = Long.parseLong(fileNameArr[0]); + long partitionId = Long.parseLong(fileNameArr[1]); + long indexId = Long.parseLong(fileNameArr[2]); + int schemaHash = Integer.parseInt(fileNameArr[4]); + + if (!tableToLoadPartitions.containsKey(tableId)) { + tableToLoadPartitions.put(tableId, Sets.newHashSet()); + } + tableToLoadPartitions.get(tableId).add(partitionId); + + indexToSchemaHash.put(indexId, schemaHash); + } + } + + private void unprotectedProcessEtlFinish() throws Exception { + // checkDataQuality + if (!checkDataQuality()) { + throw new DataQualityException(DataQualityException.QUALITY_FAIL_MSG); + } + + // get etl output files and update loading state + unprotectedUpdateToLoadingState(etlStatus, filePathToSize); + // log loading state + unprotectedLogUpdateStateInfo(); + // prepare loading infos + unprotectedPrepareLoadingInfos(); + } + + private TBrokerScanRange getTBrokerScanRange(DescriptorTable descTable, TupleDescriptor destTupleDesc, + List<Column> columns, Map<String, String> properties) + throws AnalysisException { + + TBrokerScanRange brokerScanRange = new TBrokerScanRange(); + + TBrokerScanRangeParams params = new TBrokerScanRangeParams(); + params.setStrictMode(false); + params.setProperties(properties); + TupleDescriptor srcTupleDesc = descTable.createTupleDescriptor(); + Map<String, SlotDescriptor> srcSlotDescByName = Maps.newHashMap(); + for (Column column : columns) { + SlotDescriptor srcSlotDesc = descTable.addSlotDescriptor(srcTupleDesc); + srcSlotDesc.setIsMaterialized(true); + srcSlotDesc.setIsNullable(true); + + if (column.getDataType() == PrimitiveType.BITMAP) { + // cast to bitmap when the target column type is bitmap + srcSlotDesc.setType(ScalarType.createType(PrimitiveType.BITMAP)); + srcSlotDesc.setColumn(new Column(column.getName(), PrimitiveType.BITMAP)); + } else { + srcSlotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + srcSlotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); + } + + params.addToSrcSlotIds(srcSlotDesc.getId().asInt()); + srcSlotDescByName.put(column.getName(), srcSlotDesc); + } + + Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap(); + for (SlotDescriptor destSlotDesc : destTupleDesc.getSlots()) { + if (!destSlotDesc.isMaterialized()) { + continue; + } + + SlotDescriptor srcSlotDesc = srcSlotDescByName.get(destSlotDesc.getColumn().getName()); + destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt()); + Expr expr = new SlotRef(srcSlotDesc); + expr = castToSlot(destSlotDesc, expr); + params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift()); + } + params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans); + params.setSrcTupleId(srcTupleDesc.getId().asInt()); + params.setDestTupleId(destTupleDesc.getId().asInt()); + brokerScanRange.setParams(params); + + // broker address updated for each replica + brokerScanRange.setBrokerAddresses(Lists.newArrayList()); + + // broker range desc + TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc(); + tBrokerRangeDesc.setFileType(TFileType.FILE_HDFS); + tBrokerRangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET); + tBrokerRangeDesc.setSplittable(false); + tBrokerRangeDesc.setStartOffset(0); + tBrokerRangeDesc.setSize(-1); + // path and file size updated for each replica + brokerScanRange.setRanges(Collections.singletonList(tBrokerRangeDesc)); + + return brokerScanRange; + + } + + private Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws AnalysisException { + PrimitiveType dstType = slotDesc.getType().getPrimitiveType(); + PrimitiveType srcType = expr.getType().getPrimitiveType(); + if (dstType == PrimitiveType.BOOLEAN && srcType == PrimitiveType.VARCHAR) { + // there is no cast VARCHAR to BOOLEAN function, + // so we cast VARCHAR to TINYINT first, then cast TINYINT to BOOLEAN + return new CastExpr(Type.BOOLEAN, new CastExpr(Type.TINYINT, expr)); + } + if (dstType != srcType) { + return expr.castTo(slotDesc.getType()); + } + return expr; + } + + private TDescriptorTable getTDescriptorTable(DescriptorTable descTable) { + descTable.computeStatAndMemLayout(); + return descTable.toThrift(); + } + + private PushTask buildPushTask(long backendId, OlapTable olapTable, long taskSignature, long partitionId, + long indexId, long tabletId, long replicaId, int schemaHash, int schemaVersion, + long bucket) + throws AnalysisException { + + DescriptorTable descTable = new DescriptorTable(); + TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); + + List<TColumn> columnsDesc = new ArrayList<>(); + List<Column> columns = new ArrayList<>(); + for (Column column : olapTable.getSchemaByIndexId(indexId)) { + Column col = new Column(column); + col.setName(column.getName().toLowerCase(Locale.ROOT)); + columns.add(col); + columnsDesc.add(col.toThrift()); + // use index schema to fill the descriptor table + SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc); + destSlotDesc.setIsMaterialized(true); + destSlotDesc.setColumn(col); + destSlotDesc.setIsNullable(col.isAllowNull()); + } + + // deep copy TBrokerScanRange because filePath and fileSize will be updated + // in different tablet push task + TBrokerScanRange tBrokerScanRange = + getTBrokerScanRange(descTable, destTupleDesc, columns, hadoopProperties); + // update filePath fileSize + TBrokerRangeDesc tBrokerRangeDesc = tBrokerScanRange.getRanges().get(0); + tBrokerRangeDesc.setFileType(TFileType.FILE_HDFS); + tBrokerRangeDesc.setPath(""); + tBrokerRangeDesc.setFileSize(-1); + String tabletMetaStr = String.format("%d.%d.%d.%d.%d", olapTable.getId(), partitionId, + indexId, bucket, schemaHash); + if (tabletMetaToFileInfo.containsKey(tabletMetaStr)) { + Pair<String, Long> fileInfo = tabletMetaToFileInfo.get(tabletMetaStr); + tBrokerRangeDesc.setPath(fileInfo.first); + tBrokerRangeDesc.setFileSize(fileInfo.second); + } + + TDescriptorTable tDescriptorTable = getTDescriptorTable(descTable); + + return new PushTask(backendId, dbId, olapTable.getId(), + partitionId, indexId, tabletId, replicaId, schemaHash, 0, id, + TPushType.LOAD_V2, TPriority.NORMAL, transactionId, taskSignature, + tBrokerScanRange, tDescriptorTable, columnsDesc, + olapTable.getStorageVaultId(), schemaVersion); + } + + public void updateLoadingStatus() throws UserException { + if (!checkState(JobState.LOADING)) { + return; + } + + if (etlStatus.getState() == TEtlState.CANCELLED) { + throw new LoadException(etlStatus.getFailMsg()); + } + + // submit push tasks + Set<Long> totalTablets = submitPushTasks(); Review Comment: `tabletToSentReplicaPushTask` records the tablets that have been sent and the push tasks of the replicas. Before building a push task, this map will be used to check whether it has been sent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org