morningman commented on a change in pull request #8030:
URL: https://github.com/apache/incubator-doris/pull/8030#discussion_r805171715



##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
##########
@@ -1943,16 +1776,7 @@ public void cancel(CancelStmt stmt) throws DdlException {
             List<AlterJobV2> schemaChangeJobV2List = 
getUnfinishedAlterJobV2ByTableId(olapTable.getId());
             // current schemaChangeJob job doesn't support batch operation,so 
just need to get one job
             schemaChangeJobV2 = schemaChangeJobV2List.size() == 0 ? null : 
Iterables.getOnlyElement(schemaChangeJobV2List);
-            if (schemaChangeJobV2 == null) {
-                schemaChangeJob = getAlterJob(olapTable.getId());
-                Preconditions.checkNotNull(schemaChangeJob, "Table[" + 
tableName + "] is not under SCHEMA_CHANGE.");
-                if (schemaChangeJob.getState() == JobState.FINISHING
-                        || schemaChangeJob.getState() == JobState.FINISHED
-                        || schemaChangeJob.getState() == JobState.CANCELLED) {
-                    throw new DdlException("job is already " + 
schemaChangeJob.getState().name() + ", can not cancel it");
-                }
-                schemaChangeJob.cancel(olapTable, "user cancelled");
-            }
+            Preconditions.checkNotNull(schemaChangeJobV2, "Table[" + tableName 
+ "] is not under SCHEMA_CHANGE.");

Review comment:
       How about throw DdlException? just like rollup job

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -1815,52 +1813,22 @@ public long loadAlterJob(DataInputStream dis, long 
checksum) throws IOException
     }
 
     public long loadAlterJob(DataInputStream dis, long checksum, JobType type) 
throws IOException {
-        Map<Long, AlterJob> alterJobs = null;
-        ConcurrentLinkedQueue<AlterJob> finishedOrCancelledAlterJobs = null;
         Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
-        if (type == JobType.ROLLUP) {
-            alterJobs = this.getRollupHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getRollupHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        } else if (type == JobType.SCHEMA_CHANGE) {
-            alterJobs = 
this.getSchemaChangeHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getSchemaChangeHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
-        } else if (type == JobType.DECOMMISSION_BACKEND) {
-            alterJobs = this.getClusterHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getClusterHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        }
 
         // alter jobs
         int size = dis.readInt();
         long newChecksum = checksum ^ size;
-        for (int i = 0; i < size; i++) {
-            long tableId = dis.readLong();
-            newChecksum ^= tableId;
-            AlterJob job = AlterJob.read(dis);
-            alterJobs.put(tableId, job);
-
-            // init job
-            Database db = getDbNullable(job.getDbId());
-            // should check job state here because the job is finished but not 
removed from alter jobs list
-            if (db != null && (job.getState() == 
org.apache.doris.alter.AlterJob.JobState.PENDING
-                    || job.getState() == 
org.apache.doris.alter.AlterJob.JobState.RUNNING)) {
-                job.replayInitJob(db);
-            }
+        if (size > 0) {
+            // There should be no old alter jobs, if exist throw exception, 
should not use this FE version
+            throw new IOException("There are [" + size + "] old alter jobs, 
should not happen");

Review comment:
       ```suggestion
               throw new IOException("There are [" + size + "] old alter jobs. 
Please downgrade FE to an older version and handle residual jobs");
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
##########
@@ -61,7 +61,7 @@ public boolean isFinalState() {
     }
 
     public enum JobType {
-        ROLLUP, SCHEMA_CHANGE
+        ROLLUP, SCHEMA_CHANGE, DECOMMISSION_BACKEND

Review comment:
       `DECOMMISSION_BACKEND`, do we need this?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2160,43 +2128,19 @@ public long saveAlterJob(CountingDataOutputStream dos, 
long checksum) throws IOE
     }
 
     public long saveAlterJob(CountingDataOutputStream dos, long checksum, 
JobType type) throws IOException {
-        Map<Long, AlterJob> alterJobs = null;
-        ConcurrentLinkedQueue<AlterJob> finishedOrCancelledAlterJobs = null;
         Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
-        if (type == JobType.ROLLUP) {
-            alterJobs = this.getRollupHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getRollupHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getRollupHandler().getAlterJobsV2();
-        } else if (type == JobType.SCHEMA_CHANGE) {
-            alterJobs = 
this.getSchemaChangeHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getSchemaChangeHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
-        } else if (type == JobType.DECOMMISSION_BACKEND) {
-            alterJobs = this.getClusterHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getClusterHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        }
 
-        // alter jobs
-        int size = alterJobs.size();
+        // alter jobs == 0
+        // If the FE version upgrade from old version, if it have alter jobs, 
the FE will failed during start process
+        // the number of old version alter jobs has to be 0
+        int size = 0;
         checksum ^= size;
         dos.writeInt(size);
-        for (Entry<Long, AlterJob> entry : alterJobs.entrySet()) {
-            long tableId = entry.getKey();
-            checksum ^= tableId;
-            dos.writeLong(tableId);
-            entry.getValue().write(dos);
-        }
 
         // finished or cancelled jobs
-        size = finishedOrCancelledAlterJobs.size();
+        size = 0;

Review comment:
       We don't need to write this size now.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/alter/DecommissionType.java
##########
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.alter;
+
+public enum DecommissionType {
+    SystemDecommission, // after finished system decommission, the backend 
will be removed from Palo.
+    ClusterDecommission // after finished cluster decommission, the backend 
will be removed from cluster.

Review comment:
       there is no `ClusterDecommission` now. So I think we should remove it 
too.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2160,43 +2128,19 @@ public long saveAlterJob(CountingDataOutputStream dos, 
long checksum) throws IOE
     }
 
     public long saveAlterJob(CountingDataOutputStream dos, long checksum, 
JobType type) throws IOException {
-        Map<Long, AlterJob> alterJobs = null;
-        ConcurrentLinkedQueue<AlterJob> finishedOrCancelledAlterJobs = null;
         Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
-        if (type == JobType.ROLLUP) {
-            alterJobs = this.getRollupHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getRollupHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getRollupHandler().getAlterJobsV2();
-        } else if (type == JobType.SCHEMA_CHANGE) {
-            alterJobs = 
this.getSchemaChangeHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getSchemaChangeHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
-        } else if (type == JobType.DECOMMISSION_BACKEND) {
-            alterJobs = this.getClusterHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getClusterHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        }
 
-        // alter jobs
-        int size = alterJobs.size();
+        // alter jobs == 0
+        // If the FE version upgrade from old version, if it have alter jobs, 
the FE will failed during start process
+        // the number of old version alter jobs has to be 0
+        int size = 0;

Review comment:
       We don't need to write this `size` now.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -1815,52 +1813,22 @@ public long loadAlterJob(DataInputStream dis, long 
checksum) throws IOException
     }
 
     public long loadAlterJob(DataInputStream dis, long checksum, JobType type) 
throws IOException {
-        Map<Long, AlterJob> alterJobs = null;
-        ConcurrentLinkedQueue<AlterJob> finishedOrCancelledAlterJobs = null;
         Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
-        if (type == JobType.ROLLUP) {
-            alterJobs = this.getRollupHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getRollupHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        } else if (type == JobType.SCHEMA_CHANGE) {
-            alterJobs = 
this.getSchemaChangeHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getSchemaChangeHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
-        } else if (type == JobType.DECOMMISSION_BACKEND) {
-            alterJobs = this.getClusterHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getClusterHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        }
 
         // alter jobs
         int size = dis.readInt();
         long newChecksum = checksum ^ size;
-        for (int i = 0; i < size; i++) {
-            long tableId = dis.readLong();
-            newChecksum ^= tableId;
-            AlterJob job = AlterJob.read(dis);
-            alterJobs.put(tableId, job);
-
-            // init job
-            Database db = getDbNullable(job.getDbId());
-            // should check job state here because the job is finished but not 
removed from alter jobs list
-            if (db != null && (job.getState() == 
org.apache.doris.alter.AlterJob.JobState.PENDING
-                    || job.getState() == 
org.apache.doris.alter.AlterJob.JobState.RUNNING)) {
-                job.replayInitJob(db);
-            }
+        if (size > 0) {
+            // There should be no old alter jobs, if exist throw exception, 
should not use this FE version
+            throw new IOException("There are [" + size + "] old alter jobs, 
should not happen");
         }
 
         if (Catalog.getCurrentCatalogJournalVersion() >= 2) {
             // finished or cancelled jobs
-            long currentTimeMs = System.currentTimeMillis();
             size = dis.readInt();
             newChecksum ^= size;
-            for (int i = 0; i < size; i++) {
-                long tableId = dis.readLong();
-                newChecksum ^= tableId;
-                AlterJob job = AlterJob.read(dis);
-                if ((currentTimeMs - job.getCreateTimeMs()) / 1000 <= 
Config.history_job_keep_max_second) {
-                    // delete history jobs
-                    finishedOrCancelledAlterJobs.add(job);
-                }
+            if (size > 0) {
+                throw new IOException("There are [" + size + "] old finished 
or cancelled alter jobs, should not happen");

Review comment:
       ```suggestion
                   throw new IOException("There are [" + size + "] old finished 
or cancelled alter jobs. Please downgrade FE to an older version and handle 
residual jobs");
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -1803,9 +1803,7 @@ public long loadAlterJob(DataInputStream dis, long 
checksum) throws IOException
         long newChecksum = checksum;
         for (JobType type : JobType.values()) {
             if (type == JobType.DECOMMISSION_BACKEND) {
-                if (Catalog.getCurrentCatalogJournalVersion() >= 5) {
-                    newChecksum = loadAlterJob(dis, newChecksum, type);
-                }
+                throw new IOException("There should be no DECOMMISSION_BACKEND 
jobs, not use this FE version");

Review comment:
       ```suggestion
                   throw new IOException("There should be no 
DECOMMISSION_BACKEND jobs. Please downgrade FE to an older version and handle 
residual jobs");
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
##########
@@ -71,26 +67,12 @@ public SystemHandler() {
         super("cluster");
     }
 
-    @Override

Review comment:
       I think we can refactor the `SystemHandler`, it does not need to be 
`extends AlterHandler` anymore.
   Because there is no "DecommissionBackendJob" now.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
##########
@@ -2160,43 +2128,19 @@ public long saveAlterJob(CountingDataOutputStream dos, 
long checksum) throws IOE
     }
 
     public long saveAlterJob(CountingDataOutputStream dos, long checksum, 
JobType type) throws IOException {
-        Map<Long, AlterJob> alterJobs = null;
-        ConcurrentLinkedQueue<AlterJob> finishedOrCancelledAlterJobs = null;
         Map<Long, AlterJobV2> alterJobsV2 = Maps.newHashMap();
-        if (type == JobType.ROLLUP) {
-            alterJobs = this.getRollupHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getRollupHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getRollupHandler().getAlterJobsV2();
-        } else if (type == JobType.SCHEMA_CHANGE) {
-            alterJobs = 
this.getSchemaChangeHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getSchemaChangeHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-            alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
-        } else if (type == JobType.DECOMMISSION_BACKEND) {
-            alterJobs = this.getClusterHandler().unprotectedGetAlterJobs();
-            finishedOrCancelledAlterJobs = 
this.getClusterHandler().unprotectedGetFinishedOrCancelledAlterJobs();
-        }
 
-        // alter jobs
-        int size = alterJobs.size();
+        // alter jobs == 0
+        // If the FE version upgrade from old version, if it have alter jobs, 
the FE will failed during start process
+        // the number of old version alter jobs has to be 0
+        int size = 0;

Review comment:
       I see




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to