This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 3f7307d [Spark Load]Add spark etl job main class (#3927) 3f7307d is described below commit 3f7307d685ac53eede87c506f257d71fdddbf648 Author: wyb <wyb...@gmail.com> AuthorDate: Wed Jun 24 13:54:55 2020 +0800 [Spark Load]Add spark etl job main class (#3927) 1. Add SparkEtlJob class 2. Remove DppResult comment 3. Support loading from hive table directly #3433 --- be/src/olap/push_handler.cpp | 6 +- be/src/olap/push_handler.h | 2 +- .../org/apache/doris/analysis/CreateTableStmt.java | 5 +- .../java/org/apache/doris/analysis/GrantStmt.java | 6 +- .../java/org/apache/doris/analysis/LoadStmt.java | 6 +- .../java/org/apache/doris/analysis/RevokeStmt.java | 3 +- .../java/org/apache/doris/catalog/Catalog.java | 13 +- .../java/org/apache/doris/catalog/ResourceMgr.java | 7 +- .../main/java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/common/FeMetaVersion.java | 4 +- .../org/apache/doris/journal/JournalEntity.java | 4 +- .../org/apache/doris/load/BrokerFileGroup.java | 8 +- .../main/java/org/apache/doris/load/EtlStatus.java | 13 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 3 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 1 - .../org/apache/doris/load/loadv2/LoadManager.java | 12 +- .../doris/load/loadv2/SparkEtlJobHandler.java | 18 +- .../org/apache/doris/load/loadv2/SparkLoadJob.java | 11 +- .../load/loadv2/dpp/DorisKryoRegistrator.java | 4 + .../doris/load/loadv2/dpp/GlobalDictBuilder.java | 6 +- .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 21 +- .../apache/doris/load/loadv2/etl/EtlJobConfig.java | 4 + .../apache/doris/load/loadv2/etl/SparkEtlJob.java | 250 +++++++++++++++++++++ .../org/apache/doris/mysql/privilege/PaloAuth.java | 9 +- .../org/apache/doris/mysql/privilege/PaloRole.java | 10 +- .../doris/persist/DropResourceOperationLog.java | 53 +++++ .../java/org/apache/doris/persist/EditLog.java | 12 +- .../java/org/apache/doris/persist/PrivInfo.java | 10 +- .../org/apache/doris/analysis/GrantStmtTest.java | 3 +- .../org/apache/doris/analysis/LoadStmtTest.java | 3 +- .../doris/load/loadv2/SparkEtlJobHandlerTest.java | 13 +- .../doris/load/loadv2/etl/SparkEtlJobTest.java | 152 +++++++++++++ .../org/apache/doris/mysql/privilege/AuthTest.java | 3 +- 33 files changed, 556 insertions(+), 125 deletions(-) diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 62e80d8..fa5c6bd 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -944,9 +944,9 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, LOG(WARNING) << "Failed to init mem trackers, msg: " << status.get_error_msg(); return OLAP_ERR_PUSH_INIT_ERROR; } - _runtime_profile.reset(_runtime_state->runtime_profile()); + _runtime_profile = _runtime_state->runtime_profile(); _runtime_profile->set_name("PushBrokerReader"); - _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker())); + _mem_tracker.reset(new MemTracker(_runtime_profile, -1, _runtime_profile->name(), _runtime_state->instance_mem_tracker())); _mem_pool.reset(new MemPool(_mem_tracker.get())); _counter.reset(new ScannerCounter()); @@ -955,7 +955,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, switch (t_scan_range.ranges[0].format_type) { case TFileFormatType::FORMAT_PARQUET: scanner = new ParquetScanner(_runtime_state.get(), - _runtime_profile.get(), + _runtime_profile, t_scan_range.params, t_scan_range.ranges, t_scan_range.broker_addresses, diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h index a48716e..181905d 100644 --- a/be/src/olap/push_handler.h +++ b/be/src/olap/push_handler.h @@ -247,7 +247,7 @@ private: Tuple* _tuple; const Schema* _schema; std::unique_ptr<RuntimeState> _runtime_state; - std::unique_ptr<RuntimeProfile> _runtime_profile; + RuntimeProfile* _runtime_profile; std::unique_ptr<MemTracker> _mem_tracker; std::unique_ptr<MemPool> _mem_pool; std::unique_ptr<ScannerCounter> _counter; diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index ded7c78..a31983b 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -94,9 +94,6 @@ public class CreateTableStmt extends DdlStmt { // for backup. set to -1 for normal use private int tableSignature; - // TODO(wyb): spark-load - private static boolean disableHiveTable = true; - public CreateTableStmt() { // for persist tableName = new TableName(); @@ -259,7 +256,7 @@ public class CreateTableStmt extends DdlStmt { analyzeEngineName(); // TODO(wyb): spark-load - if (engineName.equals("hive") && disableHiveTable) { + if (engineName.equals("hive") && !Config.enable_spark_load) { throw new AnalysisException("Spark Load from hive table is comming soon"); } // analyze key desc diff --git a/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java b/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java index 3d7674a..3df0335 100644 --- a/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/GrantStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.AccessPrivilege; import org.apache.doris.catalog.Catalog; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeNameFormat; @@ -46,9 +47,6 @@ public class GrantStmt extends DdlStmt { private ResourcePattern resourcePattern; private List<PaloPrivilege> privileges; - // TODO(wyb): spark-load - public static boolean disableGrantResource = true; - public GrantStmt(UserIdentity userIdent, String role, TablePattern tblPattern, List<AccessPrivilege> privileges) { this.userIdent = userIdent; this.role = role; @@ -111,7 +109,7 @@ public class GrantStmt extends DdlStmt { tblPattern.analyze(analyzer.getClusterName()); } else { // TODO(wyb): spark-load - if (disableGrantResource) { + if (!Config.enable_spark_load) { throw new AnalysisException("GRANT ON RESOURCE is comming soon"); } resourcePattern.analyze(); diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java index a9f87c2..b3636b6 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -19,6 +19,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; @@ -100,9 +101,6 @@ public class LoadStmt extends DdlStmt { private String version = "v2"; - // TODO(wyb): spark-load - public static boolean disableSparkLoad = true; - // properties set private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>() .add(TIMEOUT_PROPERTY) @@ -288,7 +286,7 @@ public class LoadStmt extends DdlStmt { resourceDesc.analyze(); etlJobType = resourceDesc.getEtlJobType(); // TODO(wyb): spark-load - if (disableSparkLoad) { + if (!Config.enable_spark_load) { throw new AnalysisException("Spark Load is comming soon"); } // check resource usage privilege diff --git a/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java b/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java index 61dd664..9d2ce60 100644 --- a/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/RevokeStmt.java @@ -20,6 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.AccessPrivilege; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; import org.apache.doris.mysql.privilege.PaloPrivilege; import org.apache.doris.mysql.privilege.PrivBitSet; @@ -98,7 +99,7 @@ public class RevokeStmt extends DdlStmt { tblPattern.analyze(analyzer.getClusterName()); } else { // TODO(wyb): spark-load - if (GrantStmt.disableGrantResource) { + if (!Config.enable_spark_load) { throw new AnalysisException("REVOKE ON RESOURCE is comming soon"); } resourcePattern.analyze(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 0d05468..4f2b1b7 100755 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1460,6 +1460,7 @@ public class Catalog { checksum = loadGlobalVariable(dis, checksum); checksum = loadCluster(dis, checksum); checksum = loadBrokers(dis, checksum); + checksum = loadResources(dis, checksum); checksum = loadExportJob(dis, checksum); checksum = loadBackupHandler(dis, checksum); checksum = loadPaloAuth(dis, checksum); @@ -1468,8 +1469,6 @@ public class Catalog { checksum = loadColocateTableIndex(dis, checksum); checksum = loadRoutineLoadJobs(dis, checksum); checksum = loadLoadJobsV2(dis, checksum); - // TODO(wyb): spark-load - //checksum = loadResources(dis, checksum); checksum = loadSmallFiles(dis, checksum); checksum = loadPlugins(dis, checksum); checksum = loadDeleteHandler(dis, checksum); @@ -1872,13 +1871,10 @@ public class Catalog { } public long loadResources(DataInputStream in, long checksum) throws IOException { - // TODO(wyb): spark-load - /* - if (MetaContext.get().getMetaVersion() >= FeMetaVersion.new_version_by_wyb) { - resourceMgr = ResourceMgr.read(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) { + resourceMgr = ResourceMgr.read(in); } LOG.info("finished replay resources from image"); - */ return checksum; } @@ -1927,6 +1923,7 @@ public class Catalog { checksum = saveGlobalVariable(dos, checksum); checksum = saveCluster(dos, checksum); checksum = saveBrokers(dos, checksum); + checksum = saveResources(dos, checksum); checksum = saveExportJob(dos, checksum); checksum = saveBackupHandler(dos, checksum); checksum = savePaloAuth(dos, checksum); @@ -1934,8 +1931,6 @@ public class Catalog { checksum = saveColocateTableIndex(dos, checksum); checksum = saveRoutineLoadJobs(dos, checksum); checksum = saveLoadJobsV2(dos, checksum); - // TODO(wyb): spark-load - //checksum = saveResources(dos, checksum); checksum = saveSmallFiles(dos, checksum); checksum = savePlugins(dos, checksum); checksum = saveDeleteHandler(dos, checksum); diff --git a/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java index 3932cee..f5b6a9f 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java +++ b/fe/src/main/java/org/apache/doris/catalog/ResourceMgr.java @@ -27,6 +27,7 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.common.proc.ProcNodeInterface; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.DropResourceOperationLog; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; @@ -88,12 +89,12 @@ public class ResourceMgr implements Writable { } // log drop - Catalog.getCurrentCatalog().getEditLog().logDropResource(name); + Catalog.getCurrentCatalog().getEditLog().logDropResource(new DropResourceOperationLog(name)); LOG.info("drop resource success. resource name: {}", name); } - public void replayDropResource(String name) { - nameToResource.remove(name); + public void replayDropResource(DropResourceOperationLog operationLog) { + nameToResource.remove(operationLog.getName()); } public boolean containsResource(String name) { diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index ef38967..7ca2170 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -1088,5 +1088,11 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean drop_backend_after_decommission = true; + + /* + * enable spark load for temporary use + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean enable_spark_load = false; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index dabd7a8..7c52c94 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -183,6 +183,8 @@ public final class FeMetaVersion { public static final int VERSION_85 = 85; // serialize origStmt in rollupJob and mv meta public static final int VERSION_86 = 86; + // spark resource, resource privilege, broker file group for hive table + public static final int VERSION_87 = 87; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_86; + public static final int VERSION_CURRENT = VERSION_87; } diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java index b707a15..5704bc7 100644 --- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -58,6 +58,7 @@ import org.apache.doris.persist.DatabaseInfo; import org.apache.doris.persist.DropInfo; import org.apache.doris.persist.DropLinkDbAndUpdateDbInfo; import org.apache.doris.persist.DropPartitionInfo; +import org.apache.doris.persist.DropResourceOperationLog; import org.apache.doris.persist.HbPackage; import org.apache.doris.persist.ModifyPartitionInfo; import org.apache.doris.persist.ModifyTablePropertyOperationLog; @@ -511,8 +512,7 @@ public class JournalEntity implements Writable { break; } case OperationType.OP_DROP_RESOURCE: { - data = new Text(); - ((Text) data).readFields(in); + data = DropResourceOperationLog.read(in); isRead = true; break; } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 78115b5..7a450a6 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -358,11 +358,8 @@ public class BrokerFileGroup implements Writable { } // src table - // TODO(wyb): spark-load - /* out.writeLong(srcTableId); out.writeBoolean(isLoadFromTable); - */ } public void readFields(DataInput in) throws IOException { @@ -414,13 +411,10 @@ public class BrokerFileGroup implements Writable { } } // src table - // TODO(wyb): spark-load - /* - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version) { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) { srcTableId = in.readLong(); isLoadFromTable = in.readBoolean(); } - */ // There are no columnExprList in the previous load job which is created before function is supported. // The columnExprList could not be analyzed without origin stmt in the previous load job. diff --git a/fe/src/main/java/org/apache/doris/load/EtlStatus.java b/fe/src/main/java/org/apache/doris/load/EtlStatus.java index bafebc4..bc01f43 100644 --- a/fe/src/main/java/org/apache/doris/load/EtlStatus.java +++ b/fe/src/main/java/org/apache/doris/load/EtlStatus.java @@ -20,7 +20,7 @@ package org.apache.doris.load; import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -//import org.apache.doris.load.loadv2.dpp.DppResult; +import org.apache.doris.load.loadv2.dpp.DppResult; import org.apache.doris.thrift.TEtlState; import com.google.common.base.Strings; @@ -46,7 +46,7 @@ public class EtlStatus implements Writable { // 0 - 100 private int progress; private String failMsg; - //private DppResult dppResult; + private DppResult dppResult; public EtlStatus() { this.state = TEtlState.RUNNING; @@ -56,7 +56,7 @@ public class EtlStatus implements Writable { this.fileMap = Maps.newHashMap(); this.progress = 0; this.failMsg = ""; - //this.dppResult = null; + this.dppResult = null; } public TEtlState getState() { @@ -128,8 +128,6 @@ public class EtlStatus implements Writable { this.failMsg = failMsg; } - // TODO(wyb): spark-load - /* public DppResult getDppResult() { return dppResult; } @@ -137,7 +135,6 @@ public class EtlStatus implements Writable { public void setDppResult(DppResult dppResult) { this.dppResult = dppResult; } - */ public void reset() { this.stats.clear(); @@ -145,7 +142,7 @@ public class EtlStatus implements Writable { this.fileMap.clear(); this.progress = 0; this.failMsg = ""; - //this.dppResult = null; + this.dppResult = null; } @Override @@ -158,7 +155,7 @@ public class EtlStatus implements Writable { ", fileMap=" + fileMap + ", progress=" + progress + ", failMsg='" + failMsg + '\'' + - //", dppResult='" + dppResult + '\'' + + ", dppResult='" + dppResult + '\'' + '}'; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index a42f4e7..9ca4c77 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; @@ -248,7 +249,7 @@ public class BrokerLoadJob extends BulkLoadJob { // check data quality if (!checkDataQuality()) { - cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG), + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), true, true); return; } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 28b556c..e9d89d7 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -82,7 +82,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements private static final Logger LOG = LogManager.getLogger(LoadJob.class); - protected static final String QUALITY_FAIL_MSG = "quality not good enough to cancel"; protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL"; protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL"; public static final String UNSELECTED_ROWS = "unselected.rows"; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index c3f25e1..9b0741b 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -17,10 +17,6 @@ package org.apache.doris.load.loadv2; -import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL; -import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL; -import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG; - import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; @@ -159,7 +155,7 @@ public class LoadManager implements Writable{ return e.getTxnId(); } catch (UserException e) { if (loadJob != null) { - loadJob.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, e.getMessage()), false, + loadJob.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false, false /* no need to write edit log, because createLoadJob log is not wrote yet */); } throw e; @@ -403,11 +399,11 @@ public class LoadManager implements Writable{ ((SparkLoadJob) job).updateEtlStatus(); } catch (DataQualityException e) { LOG.info("update load job etl status failed. job id: {}", job.getId(), e); - job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG), + job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG), true, true); } catch (UserException e) { LOG.warn("update load job etl status failed. job id: {}", job.getId(), e); - job.cancelJobWithoutCheck(new FailMsg(ETL_RUN_FAIL, e.getMessage()), true, true); + job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true); } catch (Exception e) { LOG.warn("update load job etl status failed. job id: {}", job.getId(), e); } @@ -422,7 +418,7 @@ public class LoadManager implements Writable{ ((SparkLoadJob) job).updateLoadingStatus(); } catch (UserException e) { LOG.warn("update load job loading status failed. job id: {}", job.getId(), e); - job.cancelJobWithoutCheck(new FailMsg(LOAD_RUN_FAIL, e.getMessage()), true, true); + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); } catch (Exception e) { LOG.warn("update load job loading status failed. job id: {}", job.getId(), e); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 84013c4..ff295d6 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -25,8 +25,9 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.load.EtlStatus; -//import org.apache.doris.load.loadv2.dpp.DppResult; +import org.apache.doris.load.loadv2.dpp.DppResult; import org.apache.doris.load.loadv2.etl.EtlJobConfig; +import org.apache.doris.load.loadv2.etl.SparkEtlJob; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TEtlState; @@ -46,12 +47,12 @@ import org.apache.spark.launcher.SparkAppHandle.Listener; import org.apache.spark.launcher.SparkAppHandle.State; import org.apache.spark.launcher.SparkLauncher; -//import com.google.common.base.Strings; +import com.google.common.base.Strings; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -//import com.google.gson.Gson; -//import com.google.gson.JsonSyntaxException; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -73,7 +74,6 @@ public class SparkEtlJobHandler { private static final String CONFIG_FILE_NAME = "jobconfig.json"; private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME; private static final String JOB_CONFIG_DIR = "configs"; - private static final String MAIN_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob"; private static final String ETL_JOB_NAME = "doris__%s"; // 5min private static final int GET_APPID_MAX_RETRY_TIMES = 300; @@ -112,10 +112,7 @@ public class SparkEtlJobHandler { launcher.setMaster(resource.getMaster()) .setDeployMode(resource.getDeployMode().name().toLowerCase()) .setAppResource(appResourceHdfsPath) - // TODO(wyb): spark-load - // replace with getCanonicalName later - //.setMainClass(SparkEtlJob.class.getCanonicalName()) - .setMainClass(MAIN_CLASS) + .setMainClass(SparkEtlJob.class.getCanonicalName()) .setAppName(String.format(ETL_JOB_NAME, loadLabel)) .addAppArgs(jobConfigHdfsPath); // spark configs @@ -220,8 +217,6 @@ public class SparkEtlJobHandler { if (status.getState() == TEtlState.FINISHED || status.getState() == TEtlState.CANCELLED) { // get dpp result - // TODO(wyb): spark-load - /* String dppResultFilePath = EtlJobConfig.getDppResultFilePath(etlOutputPath); try { byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc); @@ -234,7 +229,6 @@ public class SparkEtlJobHandler { } catch (UserException | JsonSyntaxException | UnsupportedEncodingException e) { LOG.warn("read broker file failed. path: {}", dppResultFilePath, e); } - */ } return status; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 715b6af..f1dff69 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -17,8 +17,6 @@ package org.apache.doris.load.loadv2; -import static org.apache.doris.common.DataQualityException.QUALITY_FAIL_MSG; - import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.CastExpr; import org.apache.doris.analysis.DescriptorTable; @@ -58,7 +56,7 @@ import org.apache.doris.common.util.LogKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; -//import org.apache.doris.load.loadv2.dpp.DppResult; +import org.apache.doris.load.loadv2.dpp.DppResult; import org.apache.doris.load.loadv2.etl.EtlJobConfig; import org.apache.doris.qe.OriginStatement; import org.apache.doris.service.FrontendOptions; @@ -76,7 +74,7 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushType; -//import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TabletQuorumFailedException; @@ -313,8 +311,6 @@ public class SparkLoadJob extends BulkLoadJob { loadingStatus.setTrackingUrl(appId); } - // TODO(wyb): spark-load - /* DppResult dppResult = etlStatus.getDppResult(); if (dppResult != null) { // update load statistic and counters when spark etl job finished @@ -331,14 +327,13 @@ public class SparkLoadJob extends BulkLoadJob { counters.put(DPP_ABNORMAL_ALL, String.valueOf(dppResult.abnormalRows)); counters.put(UNSELECTED_ROWS, String.valueOf(dppResult.unselectRows)); } - */ } private void unprotectedProcessEtlFinish(EtlStatus etlStatus, SparkEtlJobHandler handler) throws Exception { unprotectedUpdateEtlStatusInternal(etlStatus); // checkDataQuality if (!checkDataQuality()) { - throw new DataQualityException(QUALITY_FAIL_MSG); + throw new DataQualityException(DataQualityException.QUALITY_FAIL_MSG); } // get etl output files and update loading state diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java index d2568bb..f0528ec 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java @@ -20,10 +20,14 @@ package org.apache.doris.load.loadv2.dpp; import com.esotericsoftware.kryo.Kryo; import org.apache.spark.serializer.KryoRegistrator; +/** + * register etl classes with Kryo when using Kryo serialization. + */ public class DorisKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class); + kryo.register(org.apache.doris.load.loadv2.BitmapValue.class); } } \ No newline at end of file diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java index b220b40..8275edf 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalog.Column; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; @@ -200,7 +200,7 @@ public class GlobalDictBuilder { maxDictValue = (long)row.get(0); minDictValue = (long)row.get(1); } - LOG.info(" column {} 's max value in dict is {} , min value is {}", distinctColumnNameTmp, maxDictValue, minDictValue); + LOG.info(" column " + distinctColumnNameTmp + " 's max value in dict is " + maxDictValue + ", min value is " + minDictValue); // maybe never happened, but we need detect it if (minDictValue < 0) { throw new RuntimeException(String.format(" column %s 's cardinality has exceed bigint's max value", distinctColumnNameTmp)); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java index 80becc4..3bc8c95 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java @@ -746,11 +746,18 @@ public final class SparkDpp implements java.io.Serializable { } private Dataset<Row> loadDataFromHiveTable(SparkSession spark, - String hiveTableName, + String hiveDbTableName, EtlJobConfig.EtlIndex baseIndex, EtlJobConfig.EtlFileGroup fileGroup, StructType dstTableSchema) throws UserException { - Dataset<Row> dataframe = spark.sql("select * from " + hiveTableName); + // select base index columns from hive table + StringBuilder sql = new StringBuilder(); + sql.append("select "); + baseIndex.columns.forEach(column -> { + sql.append(column.columnName).append(","); + }); + sql.deleteCharAt(sql.length() - 1).append(" from ").append(hiveDbTableName); + Dataset<Row> dataframe = spark.sql(sql.toString()); dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup); return dataframe; } @@ -805,13 +812,13 @@ public final class SparkDpp implements java.io.Serializable { for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) { List<String> filePaths = fileGroup.filePaths; Dataset<Row> fileGroupDataframe = null; - if (Strings.isNullOrEmpty(fileGroup.hiveDbTableName)) { + EtlJobConfig.SourceType sourceType = fileGroup.sourceType; + if (sourceType == EtlJobConfig.SourceType.FILE) { fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema); + } else if (sourceType == EtlJobConfig.SourceType.HIVE) { + fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema); } else { - String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1); - String dorisIntermediateHiveTable = String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME, - tableId, taskId); - fileGroupDataframe = loadDataFromHiveTable(spark, dorisIntermediateHiveTable, baseIndex, fileGroup, dstTableSchema); + throw new RuntimeException("Unknown source type: " + sourceType.name()); } if (fileGroupDataframe == null) { LOG.info("no data for file file group:" + fileGroup); diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java index b9767f7..6e6756c 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java @@ -486,6 +486,10 @@ public class EtlJobConfig implements Serializable { @SerializedName(value = "hiveTableProperties") public Map<String, String> hiveTableProperties; + // hive db table used in dpp, not serialized + // set with hiveDbTableName (no bitmap column) or IntermediateHiveTable (created by global dict builder) in spark etl job + public String dppHiveDbTableName; + // for data infile path public EtlFileGroup(SourceType sourceType, List<String> filePaths, List<String> fileFieldNames, List<String> columnsFromPath, String columnSeparator, String lineDelimiter, diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java new file mode 100644 index 0000000..6fe8291 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2.etl; + +import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder; +import org.apache.doris.load.loadv2.dpp.SparkDpp; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; + +import org.apache.commons.collections.map.MultiValueMap; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * SparkEtlJob is responsible for global dict building, data partition, data sort and data aggregation. + * 1. init job config + * 2. check if job has bitmap_dict function columns + * 3. build global dict if step 2 is true + * 4. dpp (data partition, data sort and data aggregation) + */ +public class SparkEtlJob { + private static final Logger LOG = LogManager.getLogger(SparkEtlJob.class); + + private static final String BITMAP_DICT_FUNC = "bitmap_dict"; + private static final String TO_BITMAP_FUNC = "to_bitmap"; + + private String jobConfigFilePath; + private EtlJobConfig etlJobConfig; + private Set<Long> hiveSourceTables; + private Map<Long, Set<String>> tableToBitmapDictColumns; + private SparkSession spark; + + private SparkEtlJob(String jobConfigFilePath) { + this.jobConfigFilePath = jobConfigFilePath; + this.etlJobConfig = null; + this.hiveSourceTables = Sets.newHashSet(); + this.tableToBitmapDictColumns = Maps.newHashMap(); + } + + private void initSparkEnvironment() { + SparkConf conf = new SparkConf(); + //serialization conf + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrator", "org.apache.doris.load.loadv2.dpp.DorisKryoRegistrator"); + conf.set("spark.kryo.registrationRequired", "false"); + spark = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate(); + } + + private void initSparkConfigs(Map<String, String> configs) { + if (configs == null) { + return; + } + for (Map.Entry<String, String> entry : configs.entrySet()) { + spark.sparkContext().conf().set(entry.getKey(), entry.getValue()); + } + } + + private void initConfig() { + LOG.info("job config file path: " + jobConfigFilePath); + Dataset<String> ds = spark.read().textFile(jobConfigFilePath); + String jsonConfig = ds.first(); + LOG.info("rdd read json config: " + jsonConfig); + etlJobConfig = EtlJobConfig.configFromJson(jsonConfig); + LOG.info("etl job config: " + etlJobConfig); + } + + /* + * 1. check bitmap column + * 2. fill tableToBitmapDictColumns + * 3. remove bitmap_dict and to_bitmap mapping from columnMappings + */ + private void checkConfig() throws Exception { + for (Map.Entry<Long, EtlTable> entry : etlJobConfig.tables.entrySet()) { + boolean isHiveSource = false; + Set<String> bitmapDictColumns = Sets.newHashSet(); + for (EtlFileGroup fileGroup : entry.getValue().fileGroups) { + if (fileGroup.sourceType == EtlJobConfig.SourceType.HIVE) { + isHiveSource = true; + } + Map<String, EtlColumnMapping> newColumnMappings = Maps.newHashMap(); + for (Map.Entry<String, EtlColumnMapping> mappingEntry : fileGroup.columnMappings.entrySet()) { + String columnName = mappingEntry.getKey(); + String exprStr = mappingEntry.getValue().toDescription(); + String funcName = functions.expr(exprStr).expr().prettyName(); + if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) { + bitmapDictColumns.add(columnName); + } else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) { + newColumnMappings.put(mappingEntry.getKey(), mappingEntry.getValue()); + } + } + // reset new columnMappings + fileGroup.columnMappings = newColumnMappings; + } + if (isHiveSource) { + hiveSourceTables.add(entry.getKey()); + } + if (!bitmapDictColumns.isEmpty()) { + tableToBitmapDictColumns.put(entry.getKey(), bitmapDictColumns); + } + } + LOG.info("init hiveSourceTables: " + hiveSourceTables + ", tableToBitmapDictColumns: " + tableToBitmapDictColumns); + + // spark etl must have only one table with bitmap type column to process. + if (hiveSourceTables.size() > 1 || tableToBitmapDictColumns.size() > 1) { + throw new Exception("spark etl job must have only one hive table with bitmap type column to process"); + } + } + + private void processDpp() throws Exception { + SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig); + sparkDpp.init(); + sparkDpp.doDpp(); + } + + private String buildGlobalDictAndEncodeSourceTable(EtlTable table, long tableId) { + // dict column map + MultiValueMap dictColumnMap = new MultiValueMap(); + for (String dictColumn : tableToBitmapDictColumns.get(tableId)) { + dictColumnMap.put(dictColumn, null); + } + + // doris schema + List<String> dorisOlapTableColumnList = Lists.newArrayList(); + for (EtlIndex etlIndex : table.indexes) { + if (etlIndex.isBaseIndex) { + for (EtlColumn column : etlIndex.columns) { + dorisOlapTableColumnList.add(column.columnName); + } + } + } + + // hive db and tables + EtlFileGroup fileGroup = table.fileGroups.get(0); + String sourceHiveDBTableName = fileGroup.hiveDbTableName; + String dorisHiveDB = sourceHiveDBTableName.split("\\.")[0]; + String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1); + String globalDictTableName = String.format(EtlJobConfig.GLOBAL_DICT_TABLE_NAME, tableId); + String distinctKeyTableName = String.format(EtlJobConfig.DISTINCT_KEY_TABLE_NAME, tableId, taskId); + String dorisIntermediateHiveTable = String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME, tableId, taskId); + String sourceHiveFilter = fileGroup.where; + + // others + List<String> mapSideJoinColumns = Lists.newArrayList(); + int buildConcurrency = 1; + List<String> veryHighCardinalityColumn = Lists.newArrayList(); + int veryHighCardinalityColumnSplitNum = 1; + + LOG.info("global dict builder args, dictColumnMap: " + dictColumnMap + + ", dorisOlapTableColumnList: " + dorisOlapTableColumnList + + ", sourceHiveDBTableName: " + sourceHiveDBTableName + + ", sourceHiveFilter: "+ sourceHiveFilter + + ", distinctKeyTableName: " + distinctKeyTableName + + ", globalDictTableName: " + globalDictTableName + + ", dorisIntermediateHiveTable: " + dorisIntermediateHiveTable); + try { + GlobalDictBuilder globalDictBuilder = new GlobalDictBuilder( + dictColumnMap, dorisOlapTableColumnList, mapSideJoinColumns, sourceHiveDBTableName, + sourceHiveFilter, dorisHiveDB, distinctKeyTableName, globalDictTableName, dorisIntermediateHiveTable, + buildConcurrency, veryHighCardinalityColumn, veryHighCardinalityColumnSplitNum, spark); + globalDictBuilder.createHiveIntermediateTable(); + globalDictBuilder.extractDistinctColumn(); + globalDictBuilder.buildGlobalDict(); + globalDictBuilder.encodeDorisIntermediateHiveTable(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return String.format("%s.%s", dorisHiveDB, dorisIntermediateHiveTable); + } + + private void processData() throws Exception { + if (!hiveSourceTables.isEmpty()) { + // only one table + long tableId = -1; + EtlTable table = null; + for (Map.Entry<Long, EtlTable> entry : etlJobConfig.tables.entrySet()) { + tableId = entry.getKey(); + table = entry.getValue(); + break; + } + + // init hive configs like metastore service + EtlFileGroup fileGroup = table.fileGroups.get(0); + initSparkConfigs(fileGroup.hiveTableProperties); + fileGroup.dppHiveDbTableName = fileGroup.hiveDbTableName; + + // build global dict and encode source hive table if has bitmap dict columns + if (!tableToBitmapDictColumns.isEmpty() && tableToBitmapDictColumns.containsKey(tableId)) { + String dorisIntermediateHiveDbTableName = buildGlobalDictAndEncodeSourceTable(table, tableId); + // set with dorisIntermediateHiveDbTable + fileGroup.dppHiveDbTableName = dorisIntermediateHiveDbTableName; + } + } + + // data partition sort and aggregation + processDpp(); + } + + private void run() throws Exception { + initSparkEnvironment(); + initConfig(); + checkConfig(); + processData(); + } + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("missing job config file path arg"); + System.exit(-1); + } + + try { + new SparkEtlJob(args[0]).run(); + } catch (Exception e) { + System.err.println("spark etl job run failed"); + e.printStackTrace(); + System.exit(-1); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java index 19610c7..bfd2e32 100644 --- a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java +++ b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java @@ -36,6 +36,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Writable; import org.apache.doris.load.DppConfig; @@ -1312,8 +1313,7 @@ public class PaloAuth implements Writable { userPrivTable.write(out); dbPrivTable.write(out); tablePrivTable.write(out); - // TODO(wyb): spark-load - //resourcePrivTable.write(out); + resourcePrivTable.write(out); propertyMgr.write(out); } @@ -1322,12 +1322,9 @@ public class PaloAuth implements Writable { userPrivTable = (UserPrivTable) PrivTable.read(in); dbPrivTable = (DbPrivTable) PrivTable.read(in); tablePrivTable = (TablePrivTable) PrivTable.read(in); - // TODO(wyb): spark-load - /* - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version_by_wyb) { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) { resourcePrivTable = (ResourcePrivTable) PrivTable.read(in); } - */ propertyMgr = UserPropertyMgr.read(in); if (userPrivTable.isEmpty()) { diff --git a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java index 7d2f123..d9d77e4 100644 --- a/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java +++ b/fe/src/main/java/org/apache/doris/mysql/privilege/PaloRole.java @@ -20,6 +20,8 @@ package org.apache.doris.mysql.privilege; import org.apache.doris.analysis.ResourcePattern; import org.apache.doris.analysis.TablePattern; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -142,14 +144,11 @@ public class PaloRole implements Writable { entry.getKey().write(out); entry.getValue().write(out); } - // TODO(wyb): spark-load - /* out.writeInt(resourcePatternToPrivs.size()); for (Map.Entry<ResourcePattern, PrivBitSet> entry : resourcePatternToPrivs.entrySet()) { entry.getKey().write(out); entry.getValue().write(out); } - */ out.writeInt(users.size()); for (UserIdentity userIdentity : users) { userIdentity.write(out); @@ -164,9 +163,7 @@ public class PaloRole implements Writable { PrivBitSet privs = PrivBitSet.read(in); tblPatternToPrivs.put(tblPattern, privs); } - // TODO(wyb): spark-load - /* - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version_by_wyb) { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) { size = in.readInt(); for (int i = 0; i < size; i++) { ResourcePattern resourcePattern = ResourcePattern.read(in); @@ -174,7 +171,6 @@ public class PaloRole implements Writable { resourcePatternToPrivs.put(resourcePattern, privs); } } - */ size = in.readInt(); for (int i = 0; i < size; i++) { UserIdentity userIdentity = UserIdentity.read(in); diff --git a/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java b/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java new file mode 100644 index 0000000..8c69763 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/persist/DropResourceOperationLog.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * For resource drop + */ +public class DropResourceOperationLog implements Writable { + @SerializedName(value = "name") + private String name; + + public DropResourceOperationLog(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static DropResourceOperationLog read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), DropResourceOperationLog.class); + } +} diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index 7f3da6c..14bb416 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -696,8 +696,8 @@ public class EditLog { break; } case OperationType.OP_DROP_RESOURCE: { - final String resourceName = journal.getData().toString(); - catalog.getResourceMgr().replayDropResource(resourceName); + final DropResourceOperationLog operationLog = (DropResourceOperationLog) journal.getData(); + catalog.getResourceMgr().replayDropResource(operationLog); break; } case OperationType.OP_CREATE_SMALL_FILE: { @@ -1277,13 +1277,11 @@ public class EditLog { } public void logCreateResource(Resource resource) { - // TODO(wyb): spark-load - //logEdit(OperationType.OP_CREATE_RESOURCE, resource); + logEdit(OperationType.OP_CREATE_RESOURCE, resource); } - public void logDropResource(String resourceName) { - // TODO(wyb): spark-load - //logEdit(OperationType.OP_DROP_RESOURCE, new Text(resourceName)); + public void logDropResource(DropResourceOperationLog operationLog) { + logEdit(OperationType.OP_DROP_RESOURCE, operationLog); } public void logCreateSmallFile(SmallFile info) { diff --git a/fe/src/main/java/org/apache/doris/persist/PrivInfo.java b/fe/src/main/java/org/apache/doris/persist/PrivInfo.java index 26b7394..b512fa6 100644 --- a/fe/src/main/java/org/apache/doris/persist/PrivInfo.java +++ b/fe/src/main/java/org/apache/doris/persist/PrivInfo.java @@ -20,6 +20,8 @@ package org.apache.doris.persist; import org.apache.doris.analysis.ResourcePattern; import org.apache.doris.analysis.TablePattern; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.mysql.privilege.PrivBitSet; @@ -117,15 +119,12 @@ public class PrivInfo implements Writable { out.writeBoolean(false); } - // TODO(wyb): spark-load - /* if (resourcePattern != null) { out.writeBoolean(true); resourcePattern.write(out); } else { out.writeBoolean(false); } - */ if (privs != null) { out.writeBoolean(true); @@ -159,14 +158,11 @@ public class PrivInfo implements Writable { tblPattern = TablePattern.read(in); } - // TODO(wyb): spark-load - /* - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.new_version_by_wyb) { + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_87) { if (in.readBoolean()) { resourcePattern = ResourcePattern.read(in); } } - */ if (in.readBoolean()) { privs = PrivBitSet.read(in); diff --git a/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java index dc8e1f4..7907433 100644 --- a/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/GrantStmtTest.java @@ -21,6 +21,7 @@ import mockit.Expectations; import org.apache.doris.catalog.AccessPrivilege; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.qe.ConnectContext; @@ -97,7 +98,7 @@ public class GrantStmtTest { @Test public void testResourceNormal() throws UserException { // TODO(wyb): spark-load - GrantStmt.disableGrantResource = false; + Config.enable_spark_load = true; String resourceName = "spark0"; List<AccessPrivilege> privileges = Lists.newArrayList(AccessPrivilege.USAGE_PRIV); diff --git a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index 993fda1..250f022 100644 --- a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.ResourceMgr; import org.apache.doris.catalog.SparkResource; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.load.EtlJobType; import org.apache.doris.mysql.privilege.PaloAuth; @@ -107,7 +108,7 @@ public class LoadStmtTest { // test ResourceDesc // TODO(wyb): spark-load - LoadStmt.disableSparkLoad = false; + Config.enable_spark_load = true; stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, new ResourceDesc(resourceName, null), null); stmt.analyze(analyzer); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java index d288610..02a690b 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java @@ -157,9 +157,8 @@ public class SparkEtlJobHandlerTest { result = trackingUrl; report.getProgress(); returns(0.5f, 1f, 1f); - // TODO(wyb): spark-load - //BrokerUtil.readFile(anyString, (BrokerDesc) any); - //result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}"; + BrokerUtil.readFile(anyString, (BrokerDesc) any); + result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}"; } }; @@ -180,17 +179,15 @@ public class SparkEtlJobHandlerTest { status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc); Assert.assertEquals(TEtlState.CANCELLED, status.getState()); Assert.assertEquals(100, status.getProgress()); - // TODO(wyb): spark-load - //Assert.assertEquals("etl job failed", status.getDppResult().failedReason); + Assert.assertEquals("etl job failed", status.getDppResult().failedReason); // finished status = handler.getEtlJobStatus(null, appId, loadJobId, etlOutputPath, resource, brokerDesc); Assert.assertEquals(TEtlState.FINISHED, status.getState()); Assert.assertEquals(100, status.getProgress()); Assert.assertEquals(trackingUrl, status.getTrackingUrl()); - // TODO(wyb): spark-load - //Assert.assertEquals(10, status.getDppResult().normalRows); - //Assert.assertEquals(0, status.getDppResult().abnormalRows); + Assert.assertEquals(10, status.getDppResult().normalRows); + Assert.assertEquals(0, status.getDppResult().abnormalRows); } @Test diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java new file mode 100644 index 0000000..681e027 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/loadv2/etl/SparkEtlJobTest.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2.etl; + +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo; +import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SparkEtlJobTest { + private long tableId; + private long index1Id; + private long index2Id; + private long partition1Id; + private long partition2Id; + private EtlJobConfig etlJobConfig; + + @Before + public void setUp() { + tableId = 0L; + index1Id = 1L; + index2Id = 2L; + partition1Id = 3L; + partition2Id = 4L; + + // indexes + EtlColumn k1 = new EtlColumn("k1", "INT", false, true, "NONE", "0", 0, 0, 0); + EtlColumn k2 = new EtlColumn("k2", "VARCHAR", false, true, "NONE", "0", 10, 0, 0); + EtlColumn v1 = new EtlColumn("v1", "BIGINT", false, false, "NONE", "0", 0, 0, 0); + EtlIndex index1 = new EtlIndex(index1Id, Lists.newArrayList(k1, k2, v1), 666666, "DUPLICATE", true); + v1 = new EtlColumn("v1", "BIGINT", false, false, "SUM", "0", 0, 0, 0); + EtlIndex index2 = new EtlIndex(index2Id, Lists.newArrayList(k1, v1), 888888, "AGGREGATE", true); + List<EtlIndex> indexes = Lists.newArrayList(index1, index2); + // partition info + List<EtlPartition> partitions = Lists.newArrayList(); + partitions.add(new EtlPartition(partition1Id, Lists.newArrayList(0), Lists.newArrayList(100), false, 2)); + partitions.add(new EtlPartition(partition2Id, Lists.newArrayList(100), Lists.newArrayList(), true, 3)); + EtlPartitionInfo partitionInfo = new EtlPartitionInfo("RANGE", Lists.newArrayList("k1"), Lists.newArrayList("k2"), partitions); + EtlTable table = new EtlTable(indexes, partitionInfo); + // file group + Map<String, EtlColumnMapping> columnMappings = Maps.newHashMap(); + columnMappings.put("k1", new EtlColumnMapping("k1 + 1")); + table.addFileGroup(new EtlFileGroup(EtlJobConfig.SourceType.FILE, Lists.newArrayList("hdfs://127.0.0.1:10000/file"), + Lists.newArrayList(), Lists.newArrayList(), "\t", "\n", false, null, + Maps.newHashMap(), "", Lists.newArrayList(partition1Id, partition2Id))); + // tables + Map<Long, EtlTable> tables = Maps.newHashMap(); + tables.put(tableId, table); + // others + String outputFilePattern = "V1.label0.%d.%d.%d.%d.%d.parquet"; + String label = "label0"; + EtlJobProperty properties = new EtlJobProperty(); + properties.strictMode = false; + properties.timezone = "Asia/Shanghai"; + etlJobConfig = new EtlJobConfig(tables, outputFilePattern, label, properties); + } + + @Test + public void testInitConfig(@Mocked SparkSession spark, @Injectable Dataset<String> ds) { + new Expectations() { + { + SparkSession.builder().enableHiveSupport().getOrCreate(); + result = spark; + spark.read().textFile(anyString); + result = ds; + ds.first(); + result = etlJobConfig.configToJson(); + } + }; + + SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json"); + Deencapsulation.invoke(job, "initSparkEnvironment"); + Deencapsulation.invoke(job, "initConfig"); + EtlJobConfig parsedConfig = Deencapsulation.getField(job, "etlJobConfig"); + Assert.assertTrue(parsedConfig.tables.containsKey(tableId)); + EtlTable table = parsedConfig.tables.get(tableId); + Assert.assertEquals(2, table.indexes.size()); + Assert.assertEquals(2, table.partitionInfo.partitions.size()); + Assert.assertEquals(false, parsedConfig.properties.strictMode); + Assert.assertEquals("label0", parsedConfig.label); + } + + @Test + public void testCheckConfigWithoutBitmapDictColumns() { + SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json"); + Deencapsulation.setField(job, "etlJobConfig", etlJobConfig); + Deencapsulation.invoke(job, "checkConfig"); + Map<Long, Set<String>> tableToBitmapDictColumns = Deencapsulation.getField(job, "tableToBitmapDictColumns"); + // check bitmap dict columns empty + Assert.assertTrue(tableToBitmapDictColumns.isEmpty()); + } + + @Test + public void testCheckConfigWithBitmapDictColumns() { + SparkEtlJob job = Deencapsulation.newInstance(SparkEtlJob.class, "hdfs://127.0.0.1:10000/jobconfig.json"); + EtlTable table = etlJobConfig.tables.get(tableId); + table.indexes.get(0).columns.add( + new EtlColumn("v2", "BITMAP", false, false, "BITMAP_UNION", "0", 0, 0, 0) + ); + EtlFileGroup fileGroup = table.fileGroups.get(0); + fileGroup.sourceType = EtlJobConfig.SourceType.HIVE; + fileGroup.columnMappings.put( + "v2", new EtlColumnMapping("bitmap_dict", Lists.newArrayList("v2")) + ); + Deencapsulation.setField(job, "etlJobConfig", etlJobConfig); + Deencapsulation.invoke(job, "checkConfig"); + // check hive source + Set<Long> hiveSourceTables = Deencapsulation.getField(job, "hiveSourceTables"); + Assert.assertTrue(hiveSourceTables.contains(tableId)); + // check bitmap dict columns has v2 + Map<Long, Set<String>> tableToBitmapDictColumns = Deencapsulation.getField(job, "tableToBitmapDictColumns"); + Assert.assertTrue(tableToBitmapDictColumns.containsKey(tableId)); + Assert.assertTrue(tableToBitmapDictColumns.get(tableId).contains("v2")); + // check remove v2 bitmap_dict func mapping from file group column mappings + Assert.assertFalse(table.fileGroups.get(0).columnMappings.containsKey("v2")); + } +} \ No newline at end of file diff --git a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java index c7c7b53..b4c77ea 100644 --- a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java +++ b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java @@ -32,6 +32,7 @@ import org.apache.doris.catalog.AccessPrivilege; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.DomainResolver; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.persist.EditLog; @@ -1061,7 +1062,7 @@ public class AuthTest { List<AccessPrivilege> usagePrivileges = Lists.newArrayList(AccessPrivilege.USAGE_PRIV); UserDesc userDesc = new UserDesc(userIdentity, "12345", true); // TODO(wyb): spark-load - GrantStmt.disableGrantResource = false; + Config.enable_spark_load = true; // ------ grant|revoke resource to|from user ------ // 1. create user with no role --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org