This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch 2.5.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c40188f40de14f7800dab58632e30b3ee42b6f39 Author: nichunen <chunen...@kyligence.io> AuthorDate: Mon Sep 3 15:34:10 2018 +0800 KYLIN-3515 Add uuid for materialized table of hive view --- .../src/main/java/org/apache/kylin/cube/CubeManager.java | 8 ++++---- .../apache/kylin/cube/cli/DictionaryGeneratorCLI.java | 16 +++++++++------- .../java/org/apache/kylin/dict/lookup/SnapshotCLI.java | 5 ++++- .../apache/kylin/job/execution/AbstractExecutable.java | 8 ++++++++ .../kylin/job/execution/DefaultChainedExecutable.java | 4 +++- .../apache/kylin/job/execution/ExecutableManager.java | 4 ++++ .../java/org/apache/kylin/metadata/model/TableDesc.java | 8 ++++++++ .../src/main/java/org/apache/kylin/source/ISource.java | 2 +- .../main/java/org/apache/kylin/source/SourceManager.java | 4 ++-- .../main/java/org/apache/kylin/engine/mr/IMRInput.java | 2 +- .../org/apache/kylin/engine/mr/JobBuilderSupport.java | 1 + .../src/main/java/org/apache/kylin/engine/mr/MRUtil.java | 8 ++++---- .../kylin/engine/mr/steps/CreateDictionaryJob.java | 4 +++- .../mr/steps/lookup/LookupSnapshotToMetaStoreStep.java | 2 +- .../apache/kylin/source/hive/ITSnapshotManagerTest.java | 2 +- .../main/java/org/apache/kylin/rest/msg/CnMessage.java | 4 ++++ .../src/main/java/org/apache/kylin/rest/msg/Message.java | 4 ++++ .../java/org/apache/kylin/rest/service/CubeService.java | 9 ++++++++- .../java/org/apache/kylin/rest/service/JobService.java | 7 +++++++ .../java/org/apache/kylin/rest/service/TableService.java | 2 +- .../java/org/apache/kylin/source/hive/HiveInputBase.java | 10 +++++----- .../java/org/apache/kylin/source/hive/HiveMRInput.java | 13 ++++++++----- .../java/org/apache/kylin/source/hive/HiveSource.java | 4 ++-- .../org/apache/kylin/source/hive/HiveSparkInput.java | 6 ++++-- .../source/hive/cardinality/ColumnCardinalityMapper.java | 2 +- .../hive/cardinality/HiveColumnCardinalityJob.java | 6 ++++-- .../java/org/apache/kylin/source/jdbc/JdbcSource.java | 2 +- .../java/org/apache/kylin/source/kafka/KafkaMRInput.java | 2 +- .../java/org/apache/kylin/source/kafka/KafkaSource.java | 2 +- .../kylin/storage/hbase/lookup/HBaseLookupMRSteps.java | 2 +- .../storage/hbase/lookup/LookupTableToHFileJob.java | 5 +++-- .../storage/hbase/lookup/LookupTableToHFileMapper.java | 4 +++- 32 files changed, 112 insertions(+), 50 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 2a56941..0dc825f 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -1028,8 +1028,8 @@ public class CubeManager implements IRealizationProvider { return dictAssist.getDictionary(cubeSeg, col); } - public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { - return dictAssist.buildSnapshotTable(cubeSeg, lookupTable); + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException { + return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid); } private TableMetadataManager getMetadataManager() { @@ -1103,7 +1103,7 @@ public class CubeManager implements IRealizationProvider { return (Dictionary<String>) info.getDictionaryObject(); } - public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { + public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException { // work on copy instead of cached objects CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid()); @@ -1112,7 +1112,7 @@ public class CubeManager implements IRealizationProvider { SnapshotManager snapshotMgr = getSnapshotManager(); TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject())); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig()); CubeDesc cubeDesc = cubeSeg.getCubeDesc(); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 7fcf320..6de42ac 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -42,26 +42,28 @@ public class DictionaryGeneratorCLI { private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class); - public static void processSegment(KylinConfig config, String cubeName, String segmentID, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { + public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid, + DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); CubeSegment segment = cube.getSegmentById(segmentID); - processSegment(config, segment, factTableValueProvider, dictProvider); + processSegment(config, segment, uuid, factTableValueProvider, dictProvider); } - private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { + private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid, + DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(config); // dictionary for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) { logger.info("Building dictionary for " + col); IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col); - + Dictionary<String> preBuiltDict = null; if (dictProvider != null) { preBuiltDict = dictProvider.getDictionary(col); } - + if (preBuiltDict != null) { logger.debug("Dict for '" + col.getName() + "' has already been built, save it"); cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict); @@ -87,9 +89,9 @@ public class DictionaryGeneratorCLI { for (String tableIdentity : toSnapshot) { logger.info("Building snapshot of " + tableIdentity); - cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity); + cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid); } - + CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName()); cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid()); for (TableRef lookup : toCheckLookup) { diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java index f965d18..e30d156 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java @@ -42,7 +42,10 @@ public class SnapshotCLI { if (tableDesc == null) throw new IllegalArgumentException("Not table found by " + table); - SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc), tableDesc, overwriteUUID); + if (tableDesc.isView()) + throw new IllegalArgumentException("Build snapshot of hive view \'" + table + "\' not supported."); + + SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(SourceManager.createReadableTable(tableDesc, null), tableDesc, overwriteUUID); System.out.println("resource path updated: " + snapshot.getResourcePath()); } } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index dbbfc39..ad22abc 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -62,6 +62,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { private KylinConfig config; private String name; private String id; + private AbstractExecutable parentExecutable = null; private Map<String, String> params = Maps.newHashMap(); public AbstractExecutable() { @@ -396,6 +397,13 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } } + public AbstractExecutable getParentExecutable() { + return parentExecutable; + } + public void setParentExecutable(AbstractExecutable parentExecutable) { + this.parentExecutable = parentExecutable; + } + public static long getExtraInfoAsLong(Output output, String key, long defaultValue) { final String str = output.getExtra().get(key); if (str != null) { diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 2297be7..a8a91fd 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -21,6 +21,7 @@ package org.apache.kylin.job.execution; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import org.apache.kylin.common.KylinConfig; @@ -169,7 +170,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai @Override public void addTask(AbstractExecutable executable) { - executable.setId(getId() + "-" + String.format("%02d", subTasks.size())); + executable.setParentExecutable(this); + executable.setId(getId() + "-" + String.format(Locale.ROOT, "%02d", subTasks.size())); this.subTasks.add(executable); } diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index d37b3da..788a7fb 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -530,6 +530,10 @@ public class ExecutableManager { if (tasks != null && !tasks.isEmpty()) { Preconditions.checkArgument(result instanceof ChainedExecutable); for (ExecutablePO subTask : tasks) { + AbstractExecutable subTaskExecutable = parseTo(subTask); + if (subTaskExecutable != null) { + subTaskExecutable.setParentExecutable(result); + } ((ChainedExecutable) result).addTask(parseTo(subTask)); } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index f69b05d..3f9a774 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -359,6 +359,14 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { return MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + database.getName() + "_" + name; } + public String getMaterializedName(String uuid) { + if (uuid == null) { + return getMaterializedName(); + } else + return MetadataConstants.KYLIN_INTERMEDIATE_PREFIX + database.getName() + "_" + name + "_" + + uuid.replaceAll("-", "_"); + } + @Override public String toString() { return "TableDesc{" + "name='" + name + '\'' + ", columns=" + Arrays.toString(columns) + ", sourceType=" diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index f79d0f0..43df3f1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -43,7 +43,7 @@ public interface ISource extends Closeable { /** * Return a ReadableTable that can iterate through the rows of given table. */ - IReadableTable createReadableTable(TableDesc tableDesc); + IReadableTable createReadableTable(TableDesc tableDesc, String uuid); /** * Give the source a chance to enrich a SourcePartition before build start. diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java index 62c4368..03559bc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceManager.java @@ -140,8 +140,8 @@ public class SourceManager { }); } - public static IReadableTable createReadableTable(TableDesc table) { - return getSource(table).createReadableTable(table); + public static IReadableTable createReadableTable(TableDesc table, String uuid) { + return getSource(table).createReadableTable(table, uuid); } public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index f650321..c259c4e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -36,7 +36,7 @@ public interface IMRInput { public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc); /** Return an InputFormat that reads from specified table. */ - public IMRTableInputFormat getTableInputFormat(TableDesc table); + public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid); /** Return a helper to participate in batch cubing merge job flow. */ public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 5b1f38c..5f27bf8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -170,6 +170,7 @@ public class JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getDictRootPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); buildDictionaryStep.setJobParams(cmd.toString()); buildDictionaryStep.setJobClass(CreateDictionaryJob.class); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 85a425c..3a0fb84 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -42,13 +42,13 @@ public class MRUtil { return SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc); } - public static IMRTableInputFormat getTableInputFormat(String tableName, String prj) { + public static IMRTableInputFormat getTableInputFormat(String tableName, String prj, String uuid) { TableDesc t = getTableDesc(tableName, prj); - return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t); + return SourceManager.createEngineAdapter(t, IMRInput.class).getTableInputFormat(t, uuid); } - public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) { - return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc); + public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc, String uuid) { + return SourceManager.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc, uuid); } private static TableDesc getTableDesc(String tableName, String prj) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index e01da9e..aeb7b12 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@ -59,16 +59,18 @@ public class CreateDictionaryJob extends AbstractHadoopJob { options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_DICT_PATH); + options.addOption(OPTION_CUBING_JOB_ID); parseOptions(options, args); final String cubeName = getOptionValue(OPTION_CUBE_NAME); final String segmentID = getOptionValue(OPTION_SEGMENT_ID); + final String jobId = getOptionValue(OPTION_CUBING_JOB_ID); final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); final String dictPath = getOptionValue(OPTION_DICT_PATH); final KylinConfig config = KylinConfig.getInstanceFromEnv(); - DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, new DistinctColumnValuesProvider() { + DictionaryGeneratorCLI.processSegment(config, cubeName, segmentID, jobId, new DistinctColumnValuesProvider() { @Override public IReadableTable getDistinctValuesFor(TblColRef col) { return new SortedColumnDFSFile(factColumnsInputPath + "/" + col.getIdentity(), col.getType()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java index c64694c..753b67c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java @@ -61,7 +61,7 @@ public class LookupSnapshotToMetaStoreStep extends AbstractExecutable { CubeDesc cubeDesc = cube.getDescriptor(); try { TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName, cube.getProject()); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null); logger.info("take snapshot for table:" + lookupTableName); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cube.getConfig()); diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java index 872f570..efdf54b 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java @@ -56,7 +56,7 @@ public class ITSnapshotManagerTest extends HBaseMetadataTestCase { public void basicTest() throws Exception { String tableName = "EDW.TEST_SITES"; TableDesc tableDesc = TableMetadataManager.getInstance(getTestConfig()).getTableDesc(tableName, "default"); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null); String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc, getTestConfig()).getResourcePath(); snapshotMgr.wipeoutCache(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java b/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java index 2b1bf8e..dcfa19d 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/CnMessage.java @@ -165,6 +165,10 @@ public class CnMessage extends Message { return "Cube 不能被重命名"; } + public String getREBUILD_SNAPSHOT_OF_VIEW() { + return "不支持重新构建 Hive view '%s' 的 snapshot, 请刷新 Cube 的 segment"; + } + // Model public String getINVALID_MODEL_DEFINITION() { return "非法模型定义"; diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java index 5f7e296..7c0dbe3 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java @@ -165,6 +165,10 @@ public class Message { return "Cube renaming is not allowed."; } + public String getREBUILD_SNAPSHOT_OF_VIEW() { + return "Rebuild snapshot of hive view '%s' is not supported, please refresh segment of the cube"; + } + // Model public String getINVALID_MODEL_DEFINITION() { return "The data model definition is invalid."; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 5e2c49e..c5178ab 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -57,6 +57,7 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.project.RealizationEntry; @@ -490,8 +491,14 @@ public class CubeService extends BasicService implements InitializingBean { public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable) throws IOException { aclEvaluate.checkProjectOperationPermission(cube); + Message msg = MsgPicker.getMsg(); + TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject()); + if (tableDesc.isView()) { + throw new BadRequestException( + String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName())); + } CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY); - getCubeManager().buildSnapshotTable(seg, lookupTable); + getCubeManager().buildSnapshotTable(seg, lookupTable, null); return cube; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 8f8658c..71509bc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -66,6 +66,7 @@ import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentRange.TSRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.exception.BadRequestException; import org.apache.kylin.rest.msg.Message; @@ -394,6 +395,12 @@ public class JobService extends BasicService implements InitializingBean { public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException { + Message msg = MsgPicker.getMsg(); + TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject()); + if (tableDesc.isView()) { + throw new BadRequestException( + String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName())); + } LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build(); getExecutableManager().addJob(job); diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 8748009..1bb03e4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -373,7 +373,7 @@ public class TableService extends BasicService { public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException { TableDesc tableDesc = getTableManager().getTableDesc(tableName, project); - IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc); + IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null); TableSignature signature = hiveTable.getSignature(); return internalGetLookupTableSnapshots(tableName, signature); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 9a2c242..94c1a02 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -50,8 +50,8 @@ public class HiveInputBase { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveInputBase.class); - protected static String getTableNameForHCat(TableDesc table) { - String tableName = (table.isView()) ? table.getMaterializedName() : table.getName(); + protected static String getTableNameForHCat(TableDesc table, String uuid) { + String tableName = (table.isView()) ? table.getMaterializedName(uuid) : table.getName(); String database = (table.isView()) ? KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() : table.getDatabase(); return String.format("%s.%s", database, tableName).toUpperCase(); @@ -93,7 +93,7 @@ public class HiveInputBase { } protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, - String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables) { + String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -118,8 +118,8 @@ public class HiveInputBase { hiveCmdBuilder.addStatement(hiveInitStatements); for (TableDesc lookUpTableDesc : lookupViewsTables) { String identity = lookUpTableDesc.getIdentity(); - String intermediate = lookUpTableDesc.getMaterializedName(); if (lookUpTableDesc.isView()) { + String intermediate = lookUpTableDesc.getMaterializedName(uuid); String materializeViewHql = materializeViewHql(intermediate, identity, jobWorkingDir); hiveCmdBuilder.addStatement(materializeViewHql); intermediateTables.add(intermediate); @@ -134,7 +134,7 @@ public class HiveInputBase { protected static String materializeViewHql(String viewName, String tableName, String jobWorkingDir) { StringBuilder createIntermediateTableHql = new StringBuilder(); createIntermediateTableHql.append("DROP TABLE IF EXISTS " + viewName + ";\n"); - createIntermediateTableHql.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName + createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + viewName + " LIKE " + tableName + " LOCATION '" + jobWorkingDir + "/" + viewName + "';\n"); createIntermediateTableHql.append("ALTER TABLE " + viewName + " SET TBLPROPERTIES('auto.purge'='true');\n"); createIntermediateTableHql.append("INSERT OVERWRITE TABLE " + viewName + " SELECT * FROM " + tableName + ";\n"); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 33b1059..d6b85ed 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -58,8 +58,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { } @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table) { - return new HiveTableInputFormat(getTableNameForHCat(table)); + public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) { + return new HiveTableInputFormat(getTableNameForHCat(table, uuid)); } @Override @@ -139,7 +139,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, + cubeInstance.getDescriptor())); } // special for hive @@ -158,7 +159,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables); + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, + flatDesc, hiveViewIntermediateTables, jobFlow.getId()); if (task != null) { jobFlow.addTask(task); } @@ -194,7 +196,8 @@ public class HiveMRInput extends HiveInputBase implements IMRInput { * @deprecated For backwards compatibility. */ @Deprecated - public static class RedistributeFlatHiveTableStep extends org.apache.kylin.source.hive.RedistributeFlatHiveTableStep { + public static class RedistributeFlatHiveTableStep + extends org.apache.kylin.source.hive.RedistributeFlatHiveTableStep { } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index 938114c..b536bf0 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -54,12 +54,12 @@ public class HiveSource implements ISource { } @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) { // hive view must have been materialized already // ref HiveMRInput.createLookupHiveViewMaterializationStep() if (tableDesc.isView()) { KylinConfig config = KylinConfig.getInstanceFromEnv(); - String tableName = tableDesc.getMaterializedName(); + String tableName = tableDesc.getMaterializedName(uuid); tableDesc = new TableDesc(); tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java index 881be1a..d710db7 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java @@ -85,7 +85,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput { // then count and redistribute if (cubeConfig.isHiveRedistributeEnabled()) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, cubeInstance.getDescriptor())); + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName, flatDesc, + cubeInstance.getDescriptor())); } // special for hive @@ -96,7 +97,8 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput { final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, flatDesc, hiveViewIntermediateTables); + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir, + flatDesc, hiveViewIntermediateTables, jobFlow.getId()); if (task != null) { jobFlow.addTask(task); } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java index da44ea5..18d14b8 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java @@ -64,7 +64,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab String project = conf.get(BatchConstants.CFG_PROJECT_NAME); String tableName = conf.get(BatchConstants.CFG_TABLE_NAME); tableDesc = TableMetadataManager.getInstance(config).getTableDesc(tableName, project); - tableInputFormat = MRUtil.getTableInputFormat(tableDesc); + tableInputFormat = MRUtil.getTableInputFormat(tableDesc, conf.get(BatchConstants.ARG_CUBING_JOB_ID)); } @Override diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index dd32a58..f51fce0 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -50,7 +50,8 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job"; @SuppressWarnings("static-access") - protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); + protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true) + .withDescription("The hive table name").create("table"); public HiveColumnCardinalityJob() { } @@ -90,7 +91,8 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); // Mapper - IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project); + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project, + getOptionValue(OPTION_CUBING_JOB_ID)); tableInputFormat.configureJob(job); job.setMapperClass(ColumnCardinalityMapper.class); diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java index 37d119e..20e882a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java @@ -51,7 +51,7 @@ public class JdbcSource implements ISource { } @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) { return new JdbcTable(tableDesc); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 2c95c1c..73b224e 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -61,7 +61,7 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput { } @Override - public IMRTableInputFormat getTableInputFormat(TableDesc table) { + public IMRTableInputFormat getTableInputFormat(TableDesc table, String uuid) { return new KafkaTableInputFormat(cubeSegment, null); } diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 264f2ce..0d9c845 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -71,7 +71,7 @@ public class KafkaSource implements ISource { } @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { + public IReadableTable createReadableTable(TableDesc tableDesc, String uuid) { throw new UnsupportedOperationException(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java index fb5bab5..1d9181b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java @@ -82,7 +82,7 @@ public class HBaseLookupMRSteps { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject()); - IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); + IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc, context.getJobFlow().getId()); try { ExtTableSnapshotInfo latestSnapshot = extTableSnapshotInfoManager.getLatestSnapshot( sourceTable.getSignature(), tableName); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java index 054e146..199a1fe 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java @@ -90,6 +90,7 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); String tableName = getOptionValue(OPTION_TABLE_NAME); String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID); + String jobId = getOptionValue(OPTION_CUBING_JOB_ID); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(kylinConfig); @@ -101,7 +102,7 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID); - IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); + IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc, jobId); logger.info("create HTable for source table snapshot:{}", tableName); Pair<String, Integer> hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig); @@ -118,7 +119,7 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { FileOutputFormat.setOutputPath(job, output); - IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc); + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc, jobId); tableInputFormat.configureJob(job); job.setMapperClass(LookupTableToHFileMapper.class); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java index 4be9533..0ad63e9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; @@ -79,7 +80,8 @@ public class LookupTableToHFileMapper<KEYIN> extends KylinMapper<KEYIN, Object, keyColumns[i] = keyColRefs[i].getName(); } encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum); - lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc); + Configuration conf = context.getConfiguration(); + lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc, conf.get(BatchConstants.ARG_CUBING_JOB_ID)); } @Override