This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f7307d  [Spark Load]Add spark etl job main class (#3927)
3f7307d is described below

commit 3f7307d685ac53eede87c506f257d71fdddbf648
Author: wyb <wyb...@gmail.com>
AuthorDate: Wed Jun 24 13:54:55 2020 +0800

    [Spark Load]Add spark etl job main class (#3927)
    
    1. Add SparkEtlJob class
    2. Remove DppResult comment
    3. Support loading from hive table directly
    
    #3433
---
 be/src/olap/push_handler.cpp                       |   6 +-
 be/src/olap/push_handler.h                         |   2 +-
 .../org/apache/doris/analysis/CreateTableStmt.java |   5 +-
 .../java/org/apache/doris/analysis/GrantStmt.java  |   6 +-
 .../java/org/apache/doris/analysis/LoadStmt.java   |   6 +-
 .../java/org/apache/doris/analysis/RevokeStmt.java |   3 +-
 .../java/org/apache/doris/catalog/Catalog.java     |  13 +-
 .../java/org/apache/doris/catalog/ResourceMgr.java |   7 +-
 .../main/java/org/apache/doris/common/Config.java  |   6 +
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../org/apache/doris/journal/JournalEntity.java    |   4 +-
 .../org/apache/doris/load/BrokerFileGroup.java     |   8 +-
 .../main/java/org/apache/doris/load/EtlStatus.java |  13 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   3 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   1 -
 .../org/apache/doris/load/loadv2/LoadManager.java  |  12 +-
 .../doris/load/loadv2/SparkEtlJobHandler.java      |  18 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  11 +-
 .../load/loadv2/dpp/DorisKryoRegistrator.java      |   4 +
 .../doris/load/loadv2/dpp/GlobalDictBuilder.java   |   6 +-
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java |  21 +-
 .../apache/doris/load/loadv2/etl/EtlJobConfig.java |   4 +
 .../apache/doris/load/loadv2/etl/SparkEtlJob.java  | 250 +++++++++++++++++++++
 .../org/apache/doris/mysql/privilege/PaloAuth.java |   9 +-
 .../org/apache/doris/mysql/privilege/PaloRole.java |  10 +-
 .../doris/persist/DropResourceOperationLog.java    |  53 +++++
 .../java/org/apache/doris/persist/EditLog.java     |  12 +-
 .../java/org/apache/doris/persist/PrivInfo.java    |  10 +-
 .../org/apache/doris/analysis/GrantStmtTest.java   |   3 +-
 .../org/apache/doris/analysis/LoadStmtTest.java    |   3 +-
 .../doris/load/loadv2/SparkEtlJobHandlerTest.java  |  13 +-
 .../doris/load/loadv2/etl/SparkEtlJobTest.java     | 152 +++++++++++++
 .../org/apache/doris/mysql/privilege/AuthTest.java |   3 +-
 33 files changed, 556 insertions(+), 125 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 62e80d8..fa5c6bd 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -944,9 +944,9 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
         LOG(WARNING) << "Failed to init mem trackers, msg: " << 
status.get_error_msg();
         return OLAP_ERR_PUSH_INIT_ERROR;
     }
-    _runtime_profile.reset(_runtime_state->runtime_profile());
+    _runtime_profile = _runtime_state->runtime_profile();
     _runtime_profile->set_name("PushBrokerReader");
-    _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, 
_runtime_profile->name(), _runtime_state->instance_mem_tracker()));
+    _mem_tracker.reset(new MemTracker(_runtime_profile, -1, 
_runtime_profile->name(), _runtime_state->instance_mem_tracker()));
     _mem_pool.reset(new MemPool(_mem_tracker.get()));
     _counter.reset(new ScannerCounter());
 
@@ -955,7 +955,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema,
     switch (t_scan_range.ranges[0].format_type) {
     case TFileFormatType::FORMAT_PARQUET:
         scanner = new ParquetScanner(_runtime_state.get(),
-                                  _runtime_profile.get(),
+                                  _runtime_profile,
                                   t_scan_range.params,
                                   t_scan_range.ranges,
                                   t_scan_range.broker_addresses,
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index a48716e..181905d 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -247,7 +247,7 @@ private:
     Tuple* _tuple;
     const Schema* _schema;
     std::unique_ptr<RuntimeState> _runtime_state;
-    std::unique_ptr<RuntimeProfile> _runtime_profile;
+    RuntimeProfile* _runtime_profile;
     std::unique_ptr<MemTracker> _mem_tracker;
     std::unique_ptr<MemPool> _mem_pool;
     std::unique_ptr<ScannerCounter> _counter;
diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index ded7c78..a31983b 100644
--- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -94,9 +94,6 @@ public class CreateTableStmt extends DdlStmt {
     // for backup. set to -1 for normal use
     private int tableSignature;
 
-    // TODO(wyb): spark-load
-    private static boolean disableHiveTable = true;
-
     public CreateTableStmt() {
         // for persist
         tableName = new TableName();
@@ -259,7 +256,7 @@ public class CreateTableStmt extends DdlStmt {
         analyzeEngineName();
 
         // TODO(wyb): spark-load
-        if (engineName.equals("hive") && disableHiveTable) {
+        if (engineName.equals("hive") && !Config.enable_spark_load) {
             throw new AnalysisException("Spark Load from hive table is comming 
soon");
         }
         // analyze key desc
diff --git a/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
index 3d7674a..3df0335 100644
--- a/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.AccessPrivilege;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeNameFormat;
@@ -46,9 +47,6 @@ public class GrantStmt extends DdlStmt {
     private ResourcePattern resourcePattern;
     private List<PaloPrivilege> privileges;
 
-    // TODO(wyb): spark-load
-    public static boolean disableGrantResource = true;
-
     public GrantStmt(UserIdentity userIdent, String role, TablePattern 
tblPattern, List<AccessPrivilege> privileges) {
         this.userIdent = userIdent;
         this.role = role;
@@ -111,7 +109,7 @@ public class GrantStmt extends DdlStmt {
             tblPattern.analyze(analyzer.getClusterName());
         } else {
             // TODO(wyb): spark-load
-            if (disableGrantResource) {
+            if (!Config.enable_spark_load) {
                 throw new AnalysisException("GRANT ON RESOURCE is comming 
soon");
             }
             resourcePattern.analyze();
diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
index a9f87c2..b3636b6 100644
--- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
@@ -100,9 +101,6 @@ public class LoadStmt extends DdlStmt {
 
     private String version = "v2";
 
-    // TODO(wyb): spark-load
-    public static boolean disableSparkLoad = true;
-
     // properties set
     private final static ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
             .add(TIMEOUT_PROPERTY)
@@ -288,7 +286,7 @@ public class LoadStmt extends DdlStmt {
             resourceDesc.analyze();
             etlJobType = resourceDesc.getEtlJobType();
             // TODO(wyb): spark-load
-            if (disableSparkLoad) {
+            if (!Config.enable_spark_load) {
                 throw new AnalysisException("Spark Load is comming soon");
             }
             // check resource usage privilege
diff --git a/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
index 61dd664..9d2ce60 100644
--- a/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.AccessPrivilege;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.mysql.privilege.PaloPrivilege;
 import org.apache.doris.mysql.privilege.PrivBitSet;
@@ -98,7 +99,7 @@ public class RevokeStmt extends DdlStmt {
             tblPattern.analyze(analyzer.getClusterName());
         } else {
             // TODO(wyb): spark-load
-            if (GrantStmt.disableGrantResource) {
+            if (!Config.enable_spark_load) {
                 throw new AnalysisException("REVOKE ON RESOURCE is comming 
soon");
             }
             resourcePattern.analyze();
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 0d05468..4f2b1b7 100755
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1460,6 +1460,7 @@ public class Catalog {
             checksum = loadGlobalVariable(dis, checksum);
             checksum = loadCluster(dis, checksum);
             checksum = loadBrokers(dis, checksum);
+            checksum = loadResources(dis, checksum);
             checksum = loadExportJob(dis, checksum);
             checksum = loadBackupHandler(dis, checksum);
             checksum = loadPaloAuth(dis, checksum);
@@ -1468,8 +1469,6 @@ public class Catalog {
             checksum = loadColocateTableIndex(dis, checksum);
             checksum = loadRoutineLoadJobs(dis, checksum);
             checksum = loadLoadJobsV2(dis, checksum);
-            // TODO(wyb): spark-load
-            //checksum = loadResources(dis, checksum);
             checksum = loadSmallFiles(dis, checksum);
             checksum = loadPlugins(dis, checksum);
             checksum = loadDeleteHandler(dis, checksum);
@@ -1872,13 +1871,10 @@ public class Catalog {
     }
 
     public long loadResources(DataInputStream in, long checksum) throws 
IOException {
-        // TODO(wyb): spark-load
-        /*
-        if (MetaContext.get().getMetaVersion() >= 
FeMetaVersion.new_version_by_wyb) {
-            resourceMgr = ResourceMgr.read(in); 
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_87) {
+            resourceMgr = ResourceMgr.read(in);
         }
         LOG.info("finished replay resources from image");
-         */
         return checksum;
     }
 
@@ -1927,6 +1923,7 @@ public class Catalog {
             checksum = saveGlobalVariable(dos, checksum);
             checksum = saveCluster(dos, checksum);
             checksum = saveBrokers(dos, checksum);
+            checksum = saveResources(dos, checksum);
             checksum = saveExportJob(dos, checksum);
             checksum = saveBackupHandler(dos, checksum);
             checksum = savePaloAuth(dos, checksum);
@@ -1934,8 +1931,6 @@ public class Catalog {
             checksum = saveColocateTableIndex(dos, checksum);
             checksum = saveRoutineLoadJobs(dos, checksum);
             checksum = saveLoadJobsV2(dos, checksum);
-            // TODO(wyb): spark-load
-            //checksum = saveResources(dos, checksum);
             checksum = saveSmallFiles(dos, checksum);
             checksum = savePlugins(dos, checksum);
             checksum = saveDeleteHandler(dos, checksum);
diff --git a/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java 
b/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 3932cee..f5b6a9f 100644
--- a/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.common.proc.ProcNodeInterface;
 import org.apache.doris.common.proc.ProcResult;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.DropResourceOperationLog;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 
@@ -88,12 +89,12 @@ public class ResourceMgr implements Writable {
         }
 
         // log drop
-        Catalog.getCurrentCatalog().getEditLog().logDropResource(name);
+        Catalog.getCurrentCatalog().getEditLog().logDropResource(new 
DropResourceOperationLog(name));
         LOG.info("drop resource success. resource name: {}", name);
     }
 
-    public void replayDropResource(String name) {
-        nameToResource.remove(name);
+    public void replayDropResource(DropResourceOperationLog operationLog) {
+        nameToResource.remove(operationLog.getName());
     }
 
     public boolean containsResource(String name) {
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java 
b/fe/src/main/java/org/apache/doris/common/Config.java
index ef38967..7ca2170 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -1088,5 +1088,11 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean drop_backend_after_decommission = true;
+
+    /*
+     * enable spark load for temporary use
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_spark_load = false;
 }
 
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index dabd7a8..7c52c94 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -183,6 +183,8 @@ public final class FeMetaVersion {
     public static final int VERSION_85 = 85;
     // serialize origStmt in rollupJob and mv meta
     public static final int VERSION_86 = 86;
+    // spark resource, resource privilege, broker file group for hive table
+    public static final int VERSION_87 = 87;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_86;
+    public static final int VERSION_CURRENT = VERSION_87;
 }
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index b707a15..5704bc7 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -58,6 +58,7 @@ import org.apache.doris.persist.DatabaseInfo;
 import org.apache.doris.persist.DropInfo;
 import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo;
 import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.DropResourceOperationLog;
 import org.apache.doris.persist.HbPackage;
 import org.apache.doris.persist.ModifyPartitionInfo;
 import org.apache.doris.persist.ModifyTablePropertyOperationLog;
@@ -511,8 +512,7 @@ public class JournalEntity implements Writable {
                 break;
             }
             case OperationType.OP_DROP_RESOURCE: {
-                data = new Text();
-                ((Text) data).readFields(in);
+                data = DropResourceOperationLog.read(in);
                 isRead = true;
                 break;
             }
diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 78115b5..7a450a6 100644
--- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -358,11 +358,8 @@ public class BrokerFileGroup implements Writable {
         }
 
         // src table
-        // TODO(wyb): spark-load
-        /*
         out.writeLong(srcTableId);
         out.writeBoolean(isLoadFromTable);
-         */
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -414,13 +411,10 @@ public class BrokerFileGroup implements Writable {
             }
         }
         // src table
-        // TODO(wyb): spark-load
-        /*
-        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.new_version) {
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_87) {
             srcTableId = in.readLong();
             isLoadFromTable = in.readBoolean();
         }
-         */
 
         // There are no columnExprList in the previous load job which is 
created before function is supported.
         // The columnExprList could not be analyzed without origin stmt in the 
previous load job.
diff --git a/fe/src/main/java/org/apache/doris/load/EtlStatus.java 
b/fe/src/main/java/org/apache/doris/load/EtlStatus.java
index bafebc4..bc01f43 100644
--- a/fe/src/main/java/org/apache/doris/load/EtlStatus.java
+++ b/fe/src/main/java/org/apache/doris/load/EtlStatus.java
@@ -20,7 +20,7 @@ package org.apache.doris.load;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
-//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.dpp.DppResult;
 import org.apache.doris.thrift.TEtlState;
 
 import com.google.common.base.Strings;
@@ -46,7 +46,7 @@ public class EtlStatus implements Writable {
     // 0 - 100
     private int progress;
     private String failMsg;
-    //private DppResult dppResult;
+    private DppResult dppResult;
 
     public EtlStatus() {
         this.state = TEtlState.RUNNING;
@@ -56,7 +56,7 @@ public class EtlStatus implements Writable {
         this.fileMap = Maps.newHashMap();
         this.progress = 0;
         this.failMsg = "";
-        //this.dppResult = null;
+        this.dppResult = null;
     }
 
     public TEtlState getState() {
@@ -128,8 +128,6 @@ public class EtlStatus implements Writable {
         this.failMsg = failMsg;
     }
 
-    // TODO(wyb): spark-load
-    /*
     public DppResult getDppResult() {
         return dppResult;
     }
@@ -137,7 +135,6 @@ public class EtlStatus implements Writable {
     public void setDppResult(DppResult dppResult) {
         this.dppResult = dppResult;
     }
-    */
 
     public void reset() {
         this.stats.clear();
@@ -145,7 +142,7 @@ public class EtlStatus implements Writable {
         this.fileMap.clear();
         this.progress = 0;
         this.failMsg = "";
-        //this.dppResult = null;
+        this.dppResult = null;
     }
 
     @Override
@@ -158,7 +155,7 @@ public class EtlStatus implements Writable {
                 ", fileMap=" + fileMap +
                 ", progress=" + progress +
                 ", failMsg='" + failMsg + '\'' +
-                //", dppResult='" + dppResult + '\'' +
+                ", dppResult='" + dppResult + '\'' +
                 '}';
     }
 
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index a42f4e7..9ca4c77 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -248,7 +249,7 @@ public class BrokerLoadJob extends BulkLoadJob {
 
         // check data quality
         if (!checkDataQuality()) {
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, 
DataQualityException.QUALITY_FAIL_MSG),
                     true, true);
             return;
         }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 28b556c..e9d89d7 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -82,7 +82,6 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
 
     private static final Logger LOG = LogManager.getLogger(LoadJob.class);
 
-    protected static final String QUALITY_FAIL_MSG = "quality not good enough 
to cancel";
     protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL";
     protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
     public static final String UNSELECTED_ROWS = "unselected.rows";
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index c3f25e1..9b0741b 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -17,10 +17,6 @@
 
 package org.apache.doris.load.loadv2;
 
-import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL;
-import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL;
-import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
-
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
@@ -159,7 +155,7 @@ public class LoadManager implements Writable{
             return e.getTxnId();
         } catch (UserException e) {
             if (loadJob != null) {
-                loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, 
e.getMessage()), false,
+                loadJob.cancelJobWithoutCheck(new 
FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false,
                         false /* no need to write edit log, because 
createLoadJob log is not wrote yet */);
             }
             throw e;
@@ -403,11 +399,11 @@ public class LoadManager implements Writable{
                         ((SparkLoadJob) job).updateEtlStatus();
                     } catch (DataQualityException e) {
                         LOG.info("update load job etl status failed. job id: 
{}", job.getId(), e);
-                        job.cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG),
+                        job.cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, 
DataQualityException.QUALITY_FAIL_MSG),
                                                   true, true);
                     } catch (UserException e) {
                         LOG.warn("update load job etl status failed. job id: 
{}", job.getId(), e);
-                        job.cancelJobWithoutCheck(new FailMsg(ETL_RUN_FAIL, 
e.getMessage()), true, true);
+                        job.cancelJobWithoutCheck(new 
FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
                     } catch (Exception e) {
                         LOG.warn("update load job etl status failed. job id: 
{}", job.getId(), e);
                     }
@@ -422,7 +418,7 @@ public class LoadManager implements Writable{
                         ((SparkLoadJob) job).updateLoadingStatus();
                     } catch (UserException e) {
                         LOG.warn("update load job loading status failed. job 
id: {}", job.getId(), e);
-                        job.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, 
e.getMessage()), true, true);
+                        job.cancelJobWithoutCheck(new 
FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
                     } catch (Exception e) {
                         LOG.warn("update load job loading status failed. job 
id: {}", job.getId(), e);
                     }
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 84013c4..ff295d6 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -25,8 +25,9 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.load.EtlStatus;
-//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.dpp.DppResult;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.load.loadv2.etl.SparkEtlJob;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TEtlState;
 
@@ -46,12 +47,12 @@ import org.apache.spark.launcher.SparkAppHandle.Listener;
 import org.apache.spark.launcher.SparkAppHandle.State;
 import org.apache.spark.launcher.SparkLauncher;
 
-//import com.google.common.base.Strings;
+import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-//import com.google.gson.Gson;
-//import com.google.gson.JsonSyntaxException;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -73,7 +74,6 @@ public class SparkEtlJobHandler {
     private static final String CONFIG_FILE_NAME = "jobconfig.json";
     private static final String APP_RESOURCE_LOCAL_PATH = 
PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME;
     private static final String JOB_CONFIG_DIR = "configs";
-    private static final String MAIN_CLASS = 
"org.apache.doris.load.loadv2.etl.SparkEtlJob";
     private static final String ETL_JOB_NAME = "doris__%s";
     // 5min
     private static final int GET_APPID_MAX_RETRY_TIMES = 300;
@@ -112,10 +112,7 @@ public class SparkEtlJobHandler {
         launcher.setMaster(resource.getMaster())
                 .setDeployMode(resource.getDeployMode().name().toLowerCase())
                 .setAppResource(appResourceHdfsPath)
-                // TODO(wyb): spark-load
-                // replace with getCanonicalName later
-                //.setMainClass(SparkEtlJob.class.getCanonicalName())
-                .setMainClass(MAIN_CLASS)
+                .setMainClass(SparkEtlJob.class.getCanonicalName())
                 .setAppName(String.format(ETL_JOB_NAME, loadLabel))
                 .addAppArgs(jobConfigHdfsPath);
         // spark configs
@@ -220,8 +217,6 @@ public class SparkEtlJobHandler {
 
         if (status.getState() == TEtlState.FINISHED || status.getState() == 
TEtlState.CANCELLED) {
             // get dpp result
-            // TODO(wyb): spark-load
-            /*
             String dppResultFilePath = 
EtlJobConfig.getDppResultFilePath(etlOutputPath);
             try {
                 byte[] data = BrokerUtil.readFile(dppResultFilePath, 
brokerDesc);
@@ -234,7 +229,6 @@ public class SparkEtlJobHandler {
             } catch (UserException | JsonSyntaxException | 
UnsupportedEncodingException e) {
                 LOG.warn("read broker file failed. path: {}", 
dppResultFilePath, e);
             }
-            */
         }
 
         return status;
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 715b6af..f1dff69 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.load.loadv2;
 
-import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG;
-
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.DescriptorTable;
@@ -58,7 +56,7 @@ import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.FailMsg;
-//import org.apache.doris.load.loadv2.dpp.DppResult;
+import org.apache.doris.load.loadv2.dpp.DppResult;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.service.FrontendOptions;
@@ -76,7 +74,7 @@ import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushType;
-//import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TabletQuorumFailedException;
@@ -313,8 +311,6 @@ public class SparkLoadJob extends BulkLoadJob {
             loadingStatus.setTrackingUrl(appId);
         }
 
-        // TODO(wyb): spark-load
-        /*
         DppResult dppResult = etlStatus.getDppResult();
         if (dppResult != null) {
             // update load statistic and counters when spark etl job finished
@@ -331,14 +327,13 @@ public class SparkLoadJob extends BulkLoadJob {
             counters.put(DPP_ABNORMAL_ALL, 
String.valueOf(dppResult.abnormalRows));
             counters.put(UNSELECTED_ROWS, 
String.valueOf(dppResult.unselectRows));
         }
-        */
     }
 
     private void unprotectedProcessEtlFinish(EtlStatus etlStatus, 
SparkEtlJobHandler handler) throws Exception {
         unprotectedUpdateEtlStatusInternal(etlStatus);
         // checkDataQuality
         if (!checkDataQuality()) {
-            throw new DataQualityException(QUALITY_FAIL_MSG);
+            throw new 
DataQualityException(DataQualityException.QUALITY_FAIL_MSG);
         }
 
         // get etl output files and update loading state
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
index d2568bb..f0528ec 100644
--- 
a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
+++ 
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
@@ -20,10 +20,14 @@ package org.apache.doris.load.loadv2.dpp;
 import com.esotericsoftware.kryo.Kryo;
 import org.apache.spark.serializer.KryoRegistrator;
 
+/**
+ * register etl classes with Kryo when using Kryo serialization.
+ */
 public class DorisKryoRegistrator implements KryoRegistrator {
 
     @Override
     public void registerClasses(Kryo kryo) {
         kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class);
+        kryo.register(org.apache.doris.load.loadv2.BitmapValue.class);
     }
 }
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
index b220b40..8275edf 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.Column;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -200,7 +200,7 @@ public class GlobalDictBuilder {
                     maxDictValue = (long)row.get(0);
                     minDictValue = (long)row.get(1);
                 }
-                LOG.info(" column {} 's max value in dict is {} , min value is 
{}", distinctColumnNameTmp, maxDictValue, minDictValue);
+                LOG.info(" column " + distinctColumnNameTmp + " 's max value 
in dict is " + maxDictValue + ", min value is " + minDictValue);
                 // maybe never happened, but we need detect it
                 if (minDictValue < 0) {
                     throw new RuntimeException(String.format(" column %s 's 
cardinality has exceed bigint's max value", distinctColumnNameTmp));
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 80becc4..3bc8c95 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -746,11 +746,18 @@ public final class SparkDpp implements 
java.io.Serializable {
     }
 
     private Dataset<Row> loadDataFromHiveTable(SparkSession spark,
-                                               String hiveTableName,
+                                               String hiveDbTableName,
                                                EtlJobConfig.EtlIndex baseIndex,
                                                EtlJobConfig.EtlFileGroup 
fileGroup,
                                                StructType dstTableSchema) 
throws UserException {
-        Dataset<Row> dataframe = spark.sql("select * from " + hiveTableName);
+        // select base index columns from hive table
+        StringBuilder sql = new StringBuilder();
+        sql.append("select ");
+        baseIndex.columns.forEach(column -> {
+            sql.append(column.columnName).append(",");
+        });
+        sql.deleteCharAt(sql.length() - 1).append(" from 
").append(hiveDbTableName);
+        Dataset<Row> dataframe = spark.sql(sql.toString());
         dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, 
dstTableSchema, fileGroup);
         return dataframe;
     }
@@ -805,13 +812,13 @@ public final class SparkDpp implements 
java.io.Serializable {
                 for (EtlJobConfig.EtlFileGroup fileGroup : 
etlTable.fileGroups) {
                     List<String> filePaths = fileGroup.filePaths;
                     Dataset<Row> fileGroupDataframe = null;
-                    if (Strings.isNullOrEmpty(fileGroup.hiveDbTableName)) {
+                    EtlJobConfig.SourceType sourceType = fileGroup.sourceType;
+                    if (sourceType == EtlJobConfig.SourceType.FILE) {
                         fileGroupDataframe = loadDataFromFilePaths(spark, 
baseIndex, filePaths, fileGroup, dstTableSchema);
+                    } else if (sourceType == EtlJobConfig.SourceType.HIVE) {
+                        fileGroupDataframe = loadDataFromHiveTable(spark, 
fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema);
                     } else {
-                        String taskId = 
etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
-                        String dorisIntermediateHiveTable = 
String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME,
-                                                                          
tableId, taskId);
-                        fileGroupDataframe = loadDataFromHiveTable(spark, 
dorisIntermediateHiveTable, baseIndex, fileGroup, dstTableSchema);
+                        throw new RuntimeException("Unknown source type: " + 
sourceType.name());
                     }
                     if (fileGroupDataframe == null) {
                         LOG.info("no data for file file group:" + fileGroup);
diff --git 
a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
index b9767f7..6e6756c 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
@@ -486,6 +486,10 @@ public class EtlJobConfig implements Serializable {
         @SerializedName(value = "hiveTableProperties")
         public Map<String, String> hiveTableProperties;
 
+        // hive db table used in dpp, not serialized
+        // set with hiveDbTableName (no bitmap column) or 
IntermediateHiveTable (created by global dict builder) in spark etl job
+        public String dppHiveDbTableName;
+
         // for data infile path
         public EtlFileGroup(SourceType sourceType, List<String> filePaths, 
List<String> fileFieldNames,
                             List<String> columnsFromPath, String 
columnSeparator, String lineDelimiter,
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java 
b/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
new file mode 100644
index 0000000..6fe8291
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.etl;
+
+import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder;
+import org.apache.doris.load.loadv2.dpp.SparkDpp;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * SparkEtlJob is responsible for global dict building, data partition, data 
sort and data aggregation.
+ * 1. init job config
+ * 2. check if job has bitmap_dict function columns
+ * 3. build global dict if step 2 is true
+ * 4. dpp (data partition, data sort and data aggregation)
+ */
+public class SparkEtlJob {
+    private static final Logger LOG = LogManager.getLogger(SparkEtlJob.class);
+
+    private static final String BITMAP_DICT_FUNC = "bitmap_dict";
+    private static final String TO_BITMAP_FUNC = "to_bitmap";
+
+    private String jobConfigFilePath;
+    private EtlJobConfig etlJobConfig;
+    private Set<Long> hiveSourceTables;
+    private Map<Long, Set<String>> tableToBitmapDictColumns;
+    private SparkSession spark;
+
+    private SparkEtlJob(String jobConfigFilePath) {
+        this.jobConfigFilePath = jobConfigFilePath;
+        this.etlJobConfig = null;
+        this.hiveSourceTables = Sets.newHashSet();
+        this.tableToBitmapDictColumns = Maps.newHashMap();
+    }
+
+    private void initSparkEnvironment() {
+        SparkConf conf = new SparkConf();
+        //serialization conf
+        conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", 
"org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "false");
+        spark = 
SparkSession.builder().enableHiveSupport().config(conf).getOrCreate();
+    }
+
+    private void initSparkConfigs(Map<String, String> configs) {
+        if (configs == null) {
+            return;
+        }
+        for (Map.Entry<String, String> entry : configs.entrySet()) {
+            spark.sparkContext().conf().set(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void initConfig() {
+        LOG.info("job config file path: " + jobConfigFilePath);
+        Dataset<String> ds = spark.read().textFile(jobConfigFilePath);
+        String jsonConfig = ds.first();
+        LOG.info("rdd read json config: " + jsonConfig);
+        etlJobConfig = EtlJobConfig.configFromJson(jsonConfig);
+        LOG.info("etl job config: " + etlJobConfig);
+    }
+
+    /*
+     * 1. check bitmap column
+     * 2. fill tableToBitmapDictColumns
+     * 3. remove bitmap_dict and to_bitmap mapping from columnMappings
+     */
+    private void checkConfig() throws Exception {
+        for (Map.Entry<Long, EtlTable> entry : etlJobConfig.tables.entrySet()) 
{
+            boolean isHiveSource = false;
+            Set<String> bitmapDictColumns = Sets.newHashSet();
+            for (EtlFileGroup fileGroup : entry.getValue().fileGroups) {
+                if (fileGroup.sourceType == EtlJobConfig.SourceType.HIVE) {
+                    isHiveSource = true;
+                }
+                Map<String, EtlColumnMapping> newColumnMappings = 
Maps.newHashMap();
+                for (Map.Entry<String, EtlColumnMapping> mappingEntry : 
fileGroup.columnMappings.entrySet()) {
+                    String columnName = mappingEntry.getKey();
+                    String exprStr = mappingEntry.getValue().toDescription();
+                    String funcName = 
functions.expr(exprStr).expr().prettyName();
+                    if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) {
+                        bitmapDictColumns.add(columnName);
+                    } else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) {
+                        newColumnMappings.put(mappingEntry.getKey(), 
mappingEntry.getValue());
+                    }
+                }
+                // reset new columnMappings
+                fileGroup.columnMappings = newColumnMappings;
+            }
+            if (isHiveSource) {
+                hiveSourceTables.add(entry.getKey());
+            }
+            if (!bitmapDictColumns.isEmpty()) {
+                tableToBitmapDictColumns.put(entry.getKey(), 
bitmapDictColumns);
+            }
+        }
+        LOG.info("init hiveSourceTables: " + hiveSourceTables + ", 
tableToBitmapDictColumns: " + tableToBitmapDictColumns);
+
+        // spark etl must have only one table with bitmap type column to 
process.
+        if (hiveSourceTables.size() > 1 || tableToBitmapDictColumns.size() > 
1) {
+            throw new Exception("spark etl job must have only one hive table 
with bitmap type column to process");
+        }
+    }
+
+    private void processDpp() throws Exception {
+        SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig);
+        sparkDpp.init();
+        sparkDpp.doDpp();
+    }
+
+    private String buildGlobalDictAndEncodeSourceTable(EtlTable table, long 
tableId) {
+        // dict column map
+        MultiValueMap dictColumnMap = new MultiValueMap();
+        for (String dictColumn : tableToBitmapDictColumns.get(tableId)) {
+            dictColumnMap.put(dictColumn, null);
+        }
+
+        // doris schema
+        List<String> dorisOlapTableColumnList = Lists.newArrayList();
+        for (EtlIndex etlIndex : table.indexes) {
+            if (etlIndex.isBaseIndex) {
+                for (EtlColumn column : etlIndex.columns) {
+                    dorisOlapTableColumnList.add(column.columnName);
+                }
+            }
+        }
+
+        // hive db and tables
+        EtlFileGroup fileGroup = table.fileGroups.get(0);
+        String sourceHiveDBTableName = fileGroup.hiveDbTableName;
+        String dorisHiveDB = sourceHiveDBTableName.split("\\.")[0];
+        String taskId = 
etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
+        String globalDictTableName = 
String.format(EtlJobConfig.GLOBAL_DICT_TABLE_NAME, tableId);
+        String distinctKeyTableName = 
String.format(EtlJobConfig.DISTINCT_KEY_TABLE_NAME, tableId, taskId);
+        String dorisIntermediateHiveTable = 
String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME, tableId, taskId);
+        String sourceHiveFilter = fileGroup.where;
+
+        // others
+        List<String> mapSideJoinColumns = Lists.newArrayList();
+        int buildConcurrency = 1;
+        List<String> veryHighCardinalityColumn = Lists.newArrayList();
+        int veryHighCardinalityColumnSplitNum = 1;
+
+        LOG.info("global dict builder args, dictColumnMap: " + dictColumnMap
+                         + ", dorisOlapTableColumnList: " + 
dorisOlapTableColumnList
+                         + ", sourceHiveDBTableName: " + sourceHiveDBTableName
+                         + ", sourceHiveFilter: "+ sourceHiveFilter
+                         + ", distinctKeyTableName: " + distinctKeyTableName
+                         + ", globalDictTableName: " + globalDictTableName
+                         + ", dorisIntermediateHiveTable: " + 
dorisIntermediateHiveTable);
+        try {
+            GlobalDictBuilder globalDictBuilder = new GlobalDictBuilder(
+                    dictColumnMap, dorisOlapTableColumnList, 
mapSideJoinColumns, sourceHiveDBTableName,
+                    sourceHiveFilter, dorisHiveDB, distinctKeyTableName, 
globalDictTableName, dorisIntermediateHiveTable,
+                    buildConcurrency, veryHighCardinalityColumn, 
veryHighCardinalityColumnSplitNum, spark);
+            globalDictBuilder.createHiveIntermediateTable();
+            globalDictBuilder.extractDistinctColumn();
+            globalDictBuilder.buildGlobalDict();
+            globalDictBuilder.encodeDorisIntermediateHiveTable();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return String.format("%s.%s", dorisHiveDB, dorisIntermediateHiveTable);
+    }
+
+    private void processData() throws Exception {
+        if (!hiveSourceTables.isEmpty()) {
+            // only one table
+            long tableId = -1;
+            EtlTable table = null;
+            for (Map.Entry<Long, EtlTable> entry : 
etlJobConfig.tables.entrySet()) {
+                tableId = entry.getKey();
+                table = entry.getValue();
+                break;
+            }
+
+            // init hive configs like metastore service
+            EtlFileGroup fileGroup = table.fileGroups.get(0);
+            initSparkConfigs(fileGroup.hiveTableProperties);
+            fileGroup.dppHiveDbTableName = fileGroup.hiveDbTableName;
+
+            // build global dict and encode source hive table if has bitmap 
dict columns
+            if (!tableToBitmapDictColumns.isEmpty() && 
tableToBitmapDictColumns.containsKey(tableId)) {
+                String dorisIntermediateHiveDbTableName = 
buildGlobalDictAndEncodeSourceTable(table, tableId);
+                // set with dorisIntermediateHiveDbTable
+                fileGroup.dppHiveDbTableName = 
dorisIntermediateHiveDbTableName;
+            }
+        }
+
+        // data partition sort and aggregation
+        processDpp();
+    }
+
+    private void run() throws Exception {
+        initSparkEnvironment();
+        initConfig();
+        checkConfig();
+        processData();
+    }
+
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.err.println("missing job config file path arg");
+            System.exit(-1);
+        }
+
+        try {
+            new SparkEtlJob(args[0]).run();
+        } catch (Exception e) {
+            System.err.println("spark etl job run failed");
+            e.printStackTrace();
+            System.exit(-1);
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java 
b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
index 19610c7..bfd2e32 100644
--- a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
+++ b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.load.DppConfig;
@@ -1312,8 +1313,7 @@ public class PaloAuth implements Writable {
         userPrivTable.write(out);
         dbPrivTable.write(out);
         tablePrivTable.write(out);
-        // TODO(wyb): spark-load
-        //resourcePrivTable.write(out);
+        resourcePrivTable.write(out);
         propertyMgr.write(out);
     }
 
@@ -1322,12 +1322,9 @@ public class PaloAuth implements Writable {
         userPrivTable = (UserPrivTable) PrivTable.read(in);
         dbPrivTable = (DbPrivTable) PrivTable.read(in);
         tablePrivTable = (TablePrivTable) PrivTable.read(in);
-        // TODO(wyb): spark-load
-        /*
-        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.new_version_by_wyb) {
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_87) {
             resourcePrivTable = (ResourcePrivTable) PrivTable.read(in);
         }
-         */
         propertyMgr = UserPropertyMgr.read(in);
 
         if (userPrivTable.isEmpty()) {
diff --git a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java 
b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
index 7d2f123..d9d77e4 100644
--- a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
+++ b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java
@@ -20,6 +20,8 @@ package org.apache.doris.mysql.privilege;
 import org.apache.doris.analysis.ResourcePattern;
 import org.apache.doris.analysis.TablePattern;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 
@@ -142,14 +144,11 @@ public class PaloRole implements Writable {
             entry.getKey().write(out);
             entry.getValue().write(out);
         }
-        // TODO(wyb): spark-load
-        /*
         out.writeInt(resourcePatternToPrivs.size());
         for (Map.Entry<ResourcePattern, PrivBitSet> entry : 
resourcePatternToPrivs.entrySet()) {
             entry.getKey().write(out);
             entry.getValue().write(out);
         }
-         */
         out.writeInt(users.size());
         for (UserIdentity userIdentity : users) {
             userIdentity.write(out);
@@ -164,9 +163,7 @@ public class PaloRole implements Writable {
             PrivBitSet privs = PrivBitSet.read(in);
             tblPatternToPrivs.put(tblPattern, privs);
         }
-        // TODO(wyb): spark-load
-        /*
-        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.new_version_by_wyb) {
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_87) {
             size = in.readInt();
             for (int i = 0; i < size; i++) {
                 ResourcePattern resourcePattern = ResourcePattern.read(in);
@@ -174,7 +171,6 @@ public class PaloRole implements Writable {
                 resourcePatternToPrivs.put(resourcePattern, privs);
             }
         }
-         */
         size = in.readInt();
         for (int i = 0; i < size; i++) {
             UserIdentity userIdentity = UserIdentity.read(in);
diff --git 
a/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java 
b/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java
new file mode 100644
index 0000000..8c69763
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * For resource drop
+ */
+public class DropResourceOperationLog implements Writable {
+    @SerializedName(value = "name")
+    private String name;
+
+    public DropResourceOperationLog(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static DropResourceOperationLog read(DataInput in) throws 
IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(in), 
DropResourceOperationLog.class);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 7f3da6c..14bb416 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -696,8 +696,8 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_DROP_RESOURCE: {
-                    final String resourceName = journal.getData().toString();
-                    catalog.getResourceMgr().replayDropResource(resourceName);
+                    final DropResourceOperationLog operationLog = 
(DropResourceOperationLog) journal.getData();
+                    catalog.getResourceMgr().replayDropResource(operationLog);
                     break;
                 }
                 case OperationType.OP_CREATE_SMALL_FILE: {
@@ -1277,13 +1277,11 @@ public class EditLog {
     }
 
     public void logCreateResource(Resource resource) {
-        // TODO(wyb): spark-load
-        //logEdit(OperationType.OP_CREATE_RESOURCE, resource);
+        logEdit(OperationType.OP_CREATE_RESOURCE, resource);
     }
 
-    public void logDropResource(String resourceName) {
-        // TODO(wyb): spark-load
-        //logEdit(OperationType.OP_DROP_RESOURCE, new Text(resourceName));
+    public void logDropResource(DropResourceOperationLog operationLog) {
+        logEdit(OperationType.OP_DROP_RESOURCE, operationLog);
     }
 
     public void logCreateSmallFile(SmallFile info) {
diff --git a/fe/src/main/java/org/apache/doris/persist/PrivInfo.java 
b/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
index 26b7394..b512fa6 100644
--- a/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
+++ b/fe/src/main/java/org/apache/doris/persist/PrivInfo.java
@@ -20,6 +20,8 @@ package org.apache.doris.persist;
 import org.apache.doris.analysis.ResourcePattern;
 import org.apache.doris.analysis.TablePattern;
 import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.mysql.privilege.PrivBitSet;
@@ -117,15 +119,12 @@ public class PrivInfo implements Writable {
             out.writeBoolean(false);
         }
 
-        // TODO(wyb): spark-load
-        /*
         if (resourcePattern != null) {
             out.writeBoolean(true);
             resourcePattern.write(out);
         } else {
             out.writeBoolean(false);
         }
-         */
 
         if (privs != null) {
             out.writeBoolean(true);
@@ -159,14 +158,11 @@ public class PrivInfo implements Writable {
             tblPattern = TablePattern.read(in);
         }
 
-        // TODO(wyb): spark-load
-        /*
-        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.new_version_by_wyb) {
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_87) {
             if (in.readBoolean()) {
                 resourcePattern = ResourcePattern.read(in);
             }
         }
-         */
 
         if (in.readBoolean()) {
             privs = PrivBitSet.read(in);
diff --git a/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java 
b/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
index dc8e1f4..7907433 100644
--- a/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
+++ b/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java
@@ -21,6 +21,7 @@ import mockit.Expectations;
 import org.apache.doris.catalog.AccessPrivilege;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.qe.ConnectContext;
@@ -97,7 +98,7 @@ public class GrantStmtTest {
     @Test
     public void testResourceNormal() throws UserException {
         // TODO(wyb): spark-load
-        GrantStmt.disableGrantResource = false;
+        Config.enable_spark_load = true;
 
         String resourceName = "spark0";
         List<AccessPrivilege> privileges = 
Lists.newArrayList(AccessPrivilege.USAGE_PRIV);
diff --git a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java 
b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
index 993fda1..250f022 100644
--- a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
+++ b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.ResourceMgr;
 import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.mysql.privilege.PaloAuth;
@@ -107,7 +108,7 @@ public class LoadStmtTest {
 
         // test ResourceDesc
         // TODO(wyb): spark-load
-        LoadStmt.disableSparkLoad = false;
+        Config.enable_spark_load = true;
         stmt = new LoadStmt(new LabelName("testDb", "testLabel"), 
dataDescriptionList,
                             new ResourceDesc(resourceName, null), null);
         stmt.analyze(analyzer);
diff --git 
a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index d288610..02a690b 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -157,9 +157,8 @@ public class SparkEtlJobHandlerTest {
                 result = trackingUrl;
                 report.getProgress();
                 returns(0.5f, 1f, 1f);
-                // TODO(wyb): spark-load
-                //BrokerUtil.readFile(anyString, (BrokerDesc) any);
-                //result = "{'normal_rows': 10, 'abnormal_rows': 0, 
'failed_reason': 'etl job failed'}";
+                BrokerUtil.readFile(anyString, (BrokerDesc) any);
+                result = "{'normal_rows': 10, 'abnormal_rows': 0, 
'failed_reason': 'etl job failed'}";
             }
         };
 
@@ -180,17 +179,15 @@ public class SparkEtlJobHandlerTest {
         status = handler.getEtlJobStatus(null, appId, loadJobId, 
etlOutputPath, resource, brokerDesc);
         Assert.assertEquals(TEtlState.CANCELLED, status.getState());
         Assert.assertEquals(100, status.getProgress());
-        // TODO(wyb): spark-load
-        //Assert.assertEquals("etl job failed", 
status.getDppResult().failedReason);
+        Assert.assertEquals("etl job failed", 
status.getDppResult().failedReason);
 
         // finished
         status = handler.getEtlJobStatus(null, appId, loadJobId, 
etlOutputPath, resource, brokerDesc);
         Assert.assertEquals(TEtlState.FINISHED, status.getState());
         Assert.assertEquals(100, status.getProgress());
         Assert.assertEquals(trackingUrl, status.getTrackingUrl());
-        // TODO(wyb): spark-load
-        //Assert.assertEquals(10, status.getDppResult().normalRows);
-        //Assert.assertEquals(0, status.getDppResult().abnormalRows);
+        Assert.assertEquals(10, status.getDppResult().normalRows);
+        Assert.assertEquals(0, status.getDppResult().abnormalRows);
     }
 
     @Test
diff --git 
a/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java 
b/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
new file mode 100644
index 0000000..681e027
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.etl;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SparkEtlJobTest {
+    private long tableId;
+    private long index1Id;
+    private long index2Id;
+    private long partition1Id;
+    private long partition2Id;
+    private EtlJobConfig etlJobConfig;
+
+    @Before
+    public void setUp() {
+        tableId = 0L;
+        index1Id = 1L;
+        index2Id = 2L;
+        partition1Id = 3L;
+        partition2Id = 4L;
+
+        // indexes
+        EtlColumn k1 = new EtlColumn("k1", "INT", false, true, "NONE", "0", 0, 
0, 0);
+        EtlColumn k2 = new EtlColumn("k2", "VARCHAR", false, true, "NONE", 
"0", 10, 0, 0);
+        EtlColumn v1 = new EtlColumn("v1", "BIGINT", false, false, "NONE", 
"0", 0, 0, 0);
+        EtlIndex index1 = new EtlIndex(index1Id, Lists.newArrayList(k1, k2, 
v1), 666666, "DUPLICATE", true);
+        v1 = new EtlColumn("v1", "BIGINT", false, false, "SUM", "0", 0, 0, 0);
+        EtlIndex index2 = new EtlIndex(index2Id, Lists.newArrayList(k1, v1), 
888888, "AGGREGATE", true);
+        List<EtlIndex> indexes = Lists.newArrayList(index1, index2);
+        // partition info
+        List<EtlPartition> partitions = Lists.newArrayList();
+        partitions.add(new EtlPartition(partition1Id, Lists.newArrayList(0), 
Lists.newArrayList(100), false, 2));
+        partitions.add(new EtlPartition(partition2Id, Lists.newArrayList(100), 
Lists.newArrayList(), true, 3));
+        EtlPartitionInfo partitionInfo = new EtlPartitionInfo("RANGE", 
Lists.newArrayList("k1"), Lists.newArrayList("k2"), partitions);
+        EtlTable table = new EtlTable(indexes, partitionInfo);
+        // file group
+        Map<String, EtlColumnMapping> columnMappings = Maps.newHashMap();
+        columnMappings.put("k1", new EtlColumnMapping("k1 + 1"));
+        table.addFileGroup(new EtlFileGroup(EtlJobConfig.SourceType.FILE, 
Lists.newArrayList("hdfs://127.0.0.1:10000/file"),
+                                            Lists.newArrayList(), 
Lists.newArrayList(), "\t", "\n", false, null,
+                                            Maps.newHashMap(), "", 
Lists.newArrayList(partition1Id, partition2Id)));
+        // tables
+        Map<Long, EtlTable> tables = Maps.newHashMap();
+        tables.put(tableId, table);
+        // others
+        String outputFilePattern = "V1.label0.%d.%d.%d.%d.%d.parquet";
+        String label = "label0";
+        EtlJobProperty properties = new EtlJobProperty();
+        properties.strictMode = false;
+        properties.timezone = "Asia/Shanghai";
+        etlJobConfig = new EtlJobConfig(tables, outputFilePattern, label, 
properties);
+    }
+
+    @Test
+    public void testInitConfig(@Mocked SparkSession spark, @Injectable 
Dataset<String> ds) {
+        new Expectations() {
+            {
+                SparkSession.builder().enableHiveSupport().getOrCreate();
+                result = spark;
+                spark.read().textFile(anyString);
+                result = ds;
+                ds.first();
+                result = etlJobConfig.configToJson();
+            }
+        };
+
+        SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, 
"hdfs://127.0.0.1:10000/jobconfig.json");
+        Deencapsulation.invoke(job, "initSparkEnvironment");
+        Deencapsulation.invoke(job, "initConfig");
+        EtlJobConfig parsedConfig = Deencapsulation.getField(job, 
"etlJobConfig");
+        Assert.assertTrue(parsedConfig.tables.containsKey(tableId));
+        EtlTable table = parsedConfig.tables.get(tableId);
+        Assert.assertEquals(2, table.indexes.size());
+        Assert.assertEquals(2, table.partitionInfo.partitions.size());
+        Assert.assertEquals(false, parsedConfig.properties.strictMode);
+        Assert.assertEquals("label0", parsedConfig.label);
+    }
+
+    @Test
+    public void testCheckConfigWithoutBitmapDictColumns() {
+        SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, 
"hdfs://127.0.0.1:10000/jobconfig.json");
+        Deencapsulation.setField(job, "etlJobConfig", etlJobConfig);
+        Deencapsulation.invoke(job, "checkConfig");
+        Map<Long, Set<String>> tableToBitmapDictColumns = 
Deencapsulation.getField(job, "tableToBitmapDictColumns");
+        // check bitmap dict columns empty
+        Assert.assertTrue(tableToBitmapDictColumns.isEmpty());
+    }
+
+    @Test
+    public void testCheckConfigWithBitmapDictColumns() {
+        SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, 
"hdfs://127.0.0.1:10000/jobconfig.json");
+        EtlTable table = etlJobConfig.tables.get(tableId);
+        table.indexes.get(0).columns.add(
+                new EtlColumn("v2", "BITMAP", false, false, "BITMAP_UNION", 
"0", 0, 0, 0)
+        );
+        EtlFileGroup fileGroup = table.fileGroups.get(0);
+        fileGroup.sourceType = EtlJobConfig.SourceType.HIVE;
+        fileGroup.columnMappings.put(
+                "v2", new EtlColumnMapping("bitmap_dict", 
Lists.newArrayList("v2"))
+        );
+        Deencapsulation.setField(job, "etlJobConfig", etlJobConfig);
+        Deencapsulation.invoke(job, "checkConfig");
+        // check hive source
+        Set<Long> hiveSourceTables = Deencapsulation.getField(job, 
"hiveSourceTables");
+        Assert.assertTrue(hiveSourceTables.contains(tableId));
+        // check bitmap dict columns has v2
+        Map<Long, Set<String>> tableToBitmapDictColumns = 
Deencapsulation.getField(job, "tableToBitmapDictColumns");
+        Assert.assertTrue(tableToBitmapDictColumns.containsKey(tableId));
+        
Assert.assertTrue(tableToBitmapDictColumns.get(tableId).contains("v2"));
+        // check remove v2 bitmap_dict func mapping from file group column 
mappings
+        
Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2"));
+    }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java 
b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
index c7c7b53..b4c77ea 100644
--- a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
+++ b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.AccessPrivilege;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.DomainResolver;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.persist.EditLog;
@@ -1061,7 +1062,7 @@ public class AuthTest {
         List<AccessPrivilege> usagePrivileges = 
Lists.newArrayList(AccessPrivilege.USAGE_PRIV);
         UserDesc userDesc = new UserDesc(userIdentity, "12345", true);
         // TODO(wyb): spark-load
-        GrantStmt.disableGrantResource = false;
+        Config.enable_spark_load = true;
 
         // ------ grant|revoke resource to|from user ------
         // 1. create user with no role


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to