This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2b8b199b871 [Fix](merge-cloud) fix broker load failure caused by no cluster in cloud mode (#31963) 2b8b199b871 is described below commit 2b8b199b87103f2bb76106acc0a9cad8f987cdd3 Author: Luwei <814383...@qq.com> AuthorDate: Thu Mar 7 23:02:28 2024 +0800 [Fix](merge-cloud) fix broker load failure caused by no cluster in cloud mode (#31963) --- .../doris/cloud/load/CloudBrokerLoadJob.java | 8 ++- .../doris/cloud/load/CloudLoadLoadingTask.java | 81 ++++++++++++++++++++++ .../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../apache/doris/load/loadv2/LoadLoadingTask.java | 2 +- 4 files changed, 88 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java index 8381d2d0959..9fae8697b52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudBrokerLoadJob.java @@ -96,15 +96,17 @@ public class CloudBrokerLoadJob extends BrokerLoadJob { } } - private LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups, + @Override + protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups, boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey, BrokerPendingTaskAttachment attachment) throws UserException { - LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, + cloudClusterId = sessionVariables.get(CLOUD_CLUSTER_ID); + LoadLoadingTask task = new CloudLoadLoadingTask(db, table, brokerDesc, brokerFileGroups, getDeadlineMs(), getExecMemLimit(), isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(), getLoadParallelism(), getSendBatchParallelism(), getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(), - useNewLoadScanNode(), getPriority(), isEnableMemtableOnSinkNode); + useNewLoadScanNode(), getPriority(), isEnableMemtableOnSinkNode, cloudClusterId); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java new file mode 100644 index 00000000000..912de173ecc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadLoadingTask.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.load; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.Profile; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.loadv2.LoadLoadingTask; +import org.apache.doris.load.loadv2.LoadTaskCallback; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class CloudLoadLoadingTask extends LoadLoadingTask { + private static final Logger LOG = LogManager.getLogger(CloudLoadLoadingTask.class); + + private String cloudClusterId; + + public CloudLoadLoadingTask(Database db, OlapTable table, + BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups, + long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, + long txnId, LoadTaskCallback callback, String timezone, + long timeoutS, int loadParallelism, int sendBatchParallelism, + boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, + boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode, String clusterId) { + super(db, table, brokerDesc, fileGroups, jobDeadlineMs, execMemLimit, strictMode, isPartialUpdate, + txnId, callback, timezone, timeoutS, loadParallelism, sendBatchParallelism, loadZeroTolerance, + jobProfile, singleTabletLoadPerSink, useNewLoadScanNode, priority, enableMemTableOnSinkNode); + this.cloudClusterId = clusterId; + } + + private AutoCloseConnectContext buildConnectContext() throws UserException { + String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) + .getClusterNameByClusterId(this.cloudClusterId); + if (Strings.isNullOrEmpty(clusterName)) { + LOG.warn("cluster name is empty, cluster id is {}", this.cloudClusterId); + throw new UserException("cluster name is empty, cluster id is: " + this.cloudClusterId); + } + + if (ConnectContext.get() == null) { + ConnectContext connectContext = new ConnectContext(); + connectContext.setCloudCluster(clusterName); + return new AutoCloseConnectContext(connectContext); + } else { + ConnectContext.get().setCloudCluster(clusterName); + return null; + } + } + + @Override + protected void executeOnce() throws Exception { + try (AutoCloseConnectContext r = buildConnectContext()) { + super.executeOnce(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 98bd1a9f10d..6c7535d6e06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -193,7 +193,7 @@ public class BrokerLoadJob extends BulkLoadJob { loadStartTimestamp = System.currentTimeMillis(); } - private LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups, + protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups, boolean isEnableMemtableOnSinkNode, FileGroupAggKey aggKey, BrokerPendingTaskAttachment attachment) throws UserException { LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index c389fe877d0..802c52fd144 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -139,7 +139,7 @@ public class LoadLoadingTask extends LoadTask { executeOnce(); } - private void executeOnce() throws Exception { + protected void executeOnce() throws Exception { // New one query id, Coordinator curCoordinator = EnvFactory.getInstance().createCoordinator(callback.getCallbackId(), loadId, planner.getDescTable(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org