This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 81291b901559b81704fcf8a52ac0596f891e2c68
Author: luozenglin <luozeng...@baidu.com>
AuthorDate: Fri Aug 4 16:52:51 2023 +0800

    [fix](compatibility) Version 1.2 upgraded to 2.0 compatible with miniload 
metadata (#22590)
---
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   2 +
 .../org/apache/doris/load/loadv2/LoadManager.java  |  26 ++----
 .../org/apache/doris/load/loadv2/MiniLoadJob.java  | 100 +++++++++++++++++++++
 3 files changed, 107 insertions(+), 21 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 6404fe8009..4c8528abd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -865,6 +865,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             job = new SparkLoadJob();
         } else if (type == EtlJobType.INSERT) {
             job = new InsertLoadJob();
+        } else if (type == EtlJobType.MINI) {
+            job = new MiniLoadJob();
         } else {
             throw new IOException("Unknown load type: " + type.name());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 73bc6f6054..278e23bba5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -51,7 +51,6 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.DatabaseTransactionMgr;
-import org.apache.doris.transaction.TransactionState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
@@ -815,7 +814,8 @@ public class LoadManager implements Writable {
     public void write(DataOutput out) throws IOException {
         long currentTimeMs = System.currentTimeMillis();
         List<LoadJob> loadJobs =
-                idToLoadJob.values().stream().filter(t -> 
!t.isExpired(currentTimeMs)).collect(Collectors.toList());
+                idToLoadJob.values().stream().filter(t -> 
!t.isExpired(currentTimeMs))
+                        .filter(t -> !(t instanceof 
MiniLoadJob)).collect(Collectors.toList());
 
         out.writeInt(loadJobs.size());
         for (LoadJob loadJob : loadJobs) {
@@ -836,25 +836,9 @@ public class LoadManager implements Writable {
             }
 
             if (loadJob.getJobType() == EtlJobType.MINI) {
-                // This is a bug fix. the mini load job should not with state 
LOADING.
-                if (loadJob.getState() == JobState.LOADING) {
-                    LOG.warn("skip mini load job {} in db {} with LOADING 
state", loadJob.getId(), loadJob.getDbId());
-                    continue;
-                }
-
-                if (loadJob.getState() == JobState.PENDING) {
-                    // bad case. When a mini load job is created and then FE 
restart.
-                    // the job will be in PENDING state forever.
-                    // This is a temp solution to remove these jobs.
-                    // And the mini load job should be deprecated in Doris v1.1
-                    TransactionState state = 
Env.getCurrentEnv().getGlobalTransactionMgr()
-                            .getTransactionState(loadJob.getDbId(), 
loadJob.getTransactionId());
-                    if (state == null) {
-                        LOG.warn("skip mini load job {} in db {} with PENDING 
state and with txn: {}", loadJob.getId(),
-                                loadJob.getDbId(), loadJob.getTransactionId());
-                        continue;
-                    }
-                }
+                LOG.warn("skip mini load job {} in db {} as it is no longer 
supported", loadJob.getId(),
+                        loadJob.getDbId());
+                continue;
             }
             idToLoadJob.put(loadJob.getId(), loadJob);
             Map<String, List<LoadJob>> map = 
dbIdToLabelToLoadJobs.get(loadJob.getDbId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
new file mode 100644
index 0000000000..514a38ccb5
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.catalog.AuthorizationInfo;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.transaction.TransactionState;
+
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+@Deprecated
+public class MiniLoadJob extends LoadJob {
+    private static final Logger LOG = LogManager.getLogger(MiniLoadJob.class);
+
+    private String tableName;
+
+    private long tableId;
+
+    public MiniLoadJob() {
+        super(EtlJobType.MINI);
+    }
+
+    @Override
+    public Set<String> getTableNamesForShow() {
+        return Sets.newHashSet(tableName);
+    }
+
+    @Override
+    public Set<String> getTableNames() throws MetaNotFoundException {
+        return Sets.newHashSet(tableName);
+    }
+
+    @Override
+    public void beginTxn() {
+    }
+
+    @Override
+    protected void replayTxnAttachment(TransactionState txnState) {
+        updateLoadingStatue(txnState);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Text.writeString(out, tableName);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        tableName = Text.readString(in);
+    }
+
+    public AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
+        Database database = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+        return new AuthorizationInfo(database.getFullName(), getTableNames());
+    }
+
+    private void updateLoadingStatue(TransactionState txnState) {
+        MiniLoadTxnCommitAttachment miniLoadTxnCommitAttachment =
+                (MiniLoadTxnCommitAttachment) 
txnState.getTxnCommitAttachment();
+        if (miniLoadTxnCommitAttachment == null) {
+            // aborted txn may not has attachment
+            LOG.info("no miniLoadTxnCommitAttachment, txn id: {} status: {}", 
txnState.getTransactionId(),
+                    txnState.getTransactionStatus());
+            return;
+        }
+        loadingStatus.replaceCounter(DPP_ABNORMAL_ALL, 
String.valueOf(miniLoadTxnCommitAttachment.getFilteredRows()));
+        loadingStatus.replaceCounter(DPP_NORMAL_ALL, 
String.valueOf(miniLoadTxnCommitAttachment.getLoadedRows()));
+        if (miniLoadTxnCommitAttachment.getErrorLogUrl() != null) {
+            
loadingStatus.setTrackingUrl(miniLoadTxnCommitAttachment.getErrorLogUrl());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to