KYLIN-2328 Reduce the size of metadata uploaded to distributed cache
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6ea03b86 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6ea03b86 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6ea03b86 Branch: refs/heads/master-cdh5.7 Commit: 6ea03b8638d72853959aba0666aea18f1ba97391 Parents: 64c3c61 Author: gaodayue <gaoda...@meituan.com> Authored: Wed Dec 28 15:27:49 2016 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Thu Dec 29 18:39:13 2016 +0800 ---------------------------------------------------------------------- .../engine/mr/common/AbstractHadoopJob.java | 43 ++++++++++++++------ .../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +- .../engine/mr/steps/FactDistinctColumnsJob.java | 7 ++-- .../kylin/engine/mr/steps/InMemCuboidJob.java | 10 ++--- .../kylin/engine/mr/steps/MergeCuboidJob.java | 3 +- .../cardinality/HiveColumnCardinalityJob.java | 2 +- .../source/kafka/hadoop/KafkaFlatTableJob.java | 3 -- .../kylin/storage/hbase/steps/CubeHFileJob.java | 11 +++-- 8 files changed, 48 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 80636d3..4693ac3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -29,7 +29,6 @@ import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -449,33 +448,49 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } } - protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException { + protected void attachTableMetadata(TableDesc table, Configuration conf) throws IOException { Set<String> dumpList = new LinkedHashSet<>(); dumpList.add(table.getResourcePath()); attachKylinPropsAndMetadata(dumpList, KylinConfig.getInstanceFromEnv(), conf); } - protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException { - // write cube / model_desc / cube_desc / dict / table + protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException { + attachKylinPropsAndMetadata(collectCubeMetadata(cube), cube.getConfig(), conf); + } + + protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException { + Set<String> dumpList = new LinkedHashSet<>(); + dumpList.addAll(collectCubeMetadata(cube)); + for (CubeSegment segment : cube.getSegments()) { + dumpList.addAll(segment.getDictionaryPaths()); + } + attachKylinPropsAndMetadata(dumpList, cube.getConfig(), conf); + } + + protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException { + Set<String> dumpList = new LinkedHashSet<>(); + dumpList.addAll(collectCubeMetadata(segment.getCubeInstance())); + dumpList.addAll(segment.getDictionaryPaths()); + attachKylinPropsAndMetadata(dumpList, segment.getConfig(), conf); + } + + private Set<String> collectCubeMetadata(CubeInstance cube) { + // cube, model_desc, cube_desc, table Set<String> dumpList = new LinkedHashSet<>(); dumpList.add(cube.getResourcePath()); dumpList.add(cube.getDescriptor().getModel().getResourcePath()); dumpList.add(cube.getDescriptor().getResourcePath()); - for (TableRef tableRef: cube.getDescriptor().getModel().getAllTables()) { + for (TableRef tableRef : cube.getDescriptor().getModel().getAllTables()) { TableDesc table = tableRef.getTableDesc(); dumpList.add(table.getResourcePath()); - List<String> dependentResources = SourceFactory.getMRDependentResources(table); - dumpList.addAll(dependentResources); - } - for (CubeSegment segment : cube.getSegments()) { - dumpList.addAll(segment.getDictionaryPaths()); + dumpList.addAll(SourceFactory.getMRDependentResources(table)); } - attachKylinPropsAndMetadata(dumpList, cube.getConfig(), conf); + return dumpList; } - protected void attachKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException { + private void attachKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig, Configuration conf) throws IOException { File tmp = File.createTempFile("kylin_job_meta", ""); FileUtils.forceDelete(tmp); // we need a directory, so delete the file first @@ -524,6 +539,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } private void dumpResources(KylinConfig kylinConfig, File metaDir, Set<String> dumpList) throws IOException { + long startTime = System.currentTimeMillis(); + ResourceStore from = ResourceStore.getStore(kylinConfig); KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); ResourceStore to = ResourceStore.getStore(localConfig); @@ -534,6 +551,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { to.putResource(path, res.inputStream, res.timestamp); res.inputStream.close(); } + + logger.debug("Dump resources to {} took {} ms", metaDir, System.currentTimeMillis() - startTime); } protected void deletePath(Configuration conf, Path path) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index d3cb494..bd305c1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -129,7 +129,7 @@ public class CuboidJob extends AbstractHadoopJob { job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel); // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); + attachSegmentMetadataWithDict(segment, job.getConfiguration()); LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel); http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 9fc8922..ce01eb6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -110,13 +110,12 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { logger.error(s.getName() + " with status " + s.getStatus()); } throw new IllegalStateException(); - } else { - logger.info("Found segment: " + segment); } - setupMapper(cube.getSegmentById(segmentID)); + + setupMapper(segment); setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount); - attachKylinPropsAndMetadata(cube, job.getConfiguration()); + attachCubeMetadata(cube, job.getConfiguration()); return waitForCompletion(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 576ace9..1612866 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -86,11 +86,11 @@ public class InMemCuboidJob extends AbstractHadoopJob { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - CubeSegment cubeSeg = cube.getSegmentById(segmentID); + CubeSegment segment = cube.getSegmentById(segmentID); String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); if (checkSkip(cubingJobId)) { - logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeSeg); + logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segment); return 0; } @@ -101,14 +101,14 @@ public class InMemCuboidJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); + attachSegmentMetadataWithDict(segment, job.getConfiguration()); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); // set input - IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); + IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); flatTableInputFormat.configureJob(job); // set mapper @@ -118,7 +118,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { // set output job.setReducerClass(InMemCuboidReducer.class); - job.setNumReduceTasks(calculateReducerNum(cubeSeg)); + job.setNumReduceTasks(calculateReducerNum(segment)); // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade job.setOutputFormatClass(SequenceFileOutputFormat.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index e805d25..012e19f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -79,7 +79,8 @@ public class MergeCuboidJob extends CuboidJob { job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); + // TODO actually only dictionaries from merging segments are needed + attachCubeMetadataWithDict(cube, job.getConfiguration()); LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1); http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index 3c88024..ea72b54 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -103,7 +103,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'"); TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(table); - attachKylinPropsAndMetadata(tableDesc, job.getConfiguration()); + attachTableMetadata(tableDesc, job.getConfiguration()); int result = waitForCompletion(job); return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java index f0f48c0..11466e5 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -120,9 +120,6 @@ public class KafkaFlatTableJob extends AbstractHadoopJob { job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); deletePath(job.getConfiguration(), output); - - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - return waitForCompletion(job); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/6ea03b86/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 9593372..1a624c4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -88,19 +88,18 @@ public class CubeHFileJob extends AbstractHadoopJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - Configuration conf = HBaseConfiguration.create(getConf()); // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); + attachCubeMetadata(cube, job.getConfiguration()); - String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - HTable htable = new HTable(conf, tableName); + Configuration hbaseConf = HBaseConfiguration.create(getConf()); + HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase()); // Automatic config ! HFileOutputFormat.configureIncrementalLoad(job, htable); - reconfigurePartitions(conf, partitionFilePath); + reconfigurePartitions(hbaseConf, partitionFilePath); // set block replication to 3 for hfiles - conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); + hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); this.deletePath(job.getConfiguration(), output);