Repository: kylin Updated Branches: refs/heads/master 4917c6581 -> c987b30fa
KYLIN-3010 Remove v1 Spark engine Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c987b30f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c987b30f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c987b30f Branch: refs/heads/master Commit: c987b30fab06fa7b0a89cfc468db63bf2fe411cf Parents: 4917c65 Author: shaofengshi <[email protected]> Authored: Sun Nov 5 19:26:24 2017 +0800 Committer: shaofengshi <[email protected]> Committed: Sun Nov 5 19:26:24 2017 +0800 ---------------------------------------------------------------------- .../engine/spark/SparkBatchCubingEngine.java | 67 --- .../kylin/engine/spark/SparkCountDemo.java | 80 --- .../apache/kylin/engine/spark/SparkCubing.java | 585 ------------------- .../engine/spark/SparkCubingJobBuilder.java | 69 --- .../kylin/engine/spark/SparkCuboidWriter.java | 29 - .../kylin/engine/spark/SparkHelloWorld.java | 37 -- .../kylin/engine/spark/SparkHiveDemo.java | 52 -- .../engine/spark/cube/BufferedCuboidWriter.java | 106 ---- .../spark/cube/DefaultTupleConverter.java | 98 ---- .../spark/cube/ListBackedCuboidWriter.java | 59 -- .../kylin/engine/spark/cube/TupleConverter.java | 29 - .../spark/cube/BufferedCuboidWriterTest.java | 69 --- 12 files changed, 1280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java deleted file mode 100644 index 08ed207..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.engine.IBatchCubingEngine; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; - -/** - */ -public class SparkBatchCubingEngine implements IBatchCubingEngine { - - private final String confPath; - private final String coprocessor; - - public SparkBatchCubingEngine(String confPath, String coprocessor) { - this.confPath = confPath; - this.coprocessor = coprocessor; - } - - @Override - public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { - return new SparkCubingJobBuilder(newSegment, submitter, confPath, coprocessor).build(); - } - - @Override - public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) { - return null; - } - - @Override - public Class<?> getSourceInterface() { - return null; - } - - @Override - public Class<?> getStorageInterface() { - return null; - } - - @Override - public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) { - throw new UnsupportedOperationException(); - } - - @Override - public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java deleted file mode 100644 index 6478c10..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.storage.StorageLevel; - -import scala.Tuple2; - -/** - */ -public class SparkCountDemo extends AbstractApplication { - - private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); - - private Options options; - - public SparkCountDemo() { - options = new Options(); - // options.addOption(OPTION_INPUT_PATH); - } - - @Override - protected Options getOptions() { - return options; - } - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system - SparkConf conf = new SparkConf().setAppName("Simple Application"); - JavaSparkContext sc = new JavaSparkContext(conf); - final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() { - - @Override - public Tuple2<String, Integer> call(String s) throws Exception { - return new Tuple2<String, Integer>(s, s.length()); - } - }).sortByKey(); - logData.persist(StorageLevel.MEMORY_AND_DISK_SER()); - - System.out.println("line number:" + logData.count()); - - logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() { - @Override - public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { - ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes()); - KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes()); - return new Tuple2(key, value); - } - }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class); - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java deleted file mode 100644 index c8aee5d..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ /dev/null @@ -1,585 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FsShell; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; -import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; -import org.apache.kylin.cube.kv.CubeDimEncMap; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; -import org.apache.kylin.cube.model.DimensionDesc; -import org.apache.kylin.cube.model.RowKeyDesc; -import org.apache.kylin.cube.util.CubingUtils; -import org.apache.kylin.dict.DictionaryGenerator; -import org.apache.kylin.dict.IterableDictionaryValueEnumerator; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CubeStatsReader; -import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; -import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; -import org.apache.kylin.engine.spark.util.IteratorUtils; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.hllc.HLLCounter; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.steps.CreateHTableJob; -import org.apache.kylin.storage.hbase.steps.CubeHTableUtil; -import org.apache.spark.Partitioner; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkFiles; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.hive.HiveContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import com.google.common.primitives.UnsignedBytes; - -import scala.Tuple2; - -/** - */ -public class SparkCubing extends AbstractApplication { - - protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class); - - private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); - private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); - private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); - private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); - private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor"); - - private Options options; - - public SparkCubing() { - options = new Options(); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_ID); - options.addOption(OPTION_CONF_PATH); - options.addOption(OPTION_COPROCESSOR); - - } - - @Override - protected Options getOptions() { - return options; - } - - public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException { - File metaDir = new File(folder); - if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) { - System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath()); - logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath()); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath()); - kylinConfig.setMetadataUrl(metaDir.getAbsolutePath()); - return kylinConfig; - } else { - return KylinConfig.getInstanceFromEnv(); - } - } - - private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { - ClassUtil.addClasspath(confPath); - final File[] files = new File(confPath).listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - if (pathname.getAbsolutePath().endsWith(".xml")) { - return true; - } - if (pathname.getAbsolutePath().endsWith(".properties")) { - return true; - } - return false; - } - }); - if (files == null) { - return; - } - for (File file : files) { - sc.addFile(file.getAbsolutePath()); - } - } - - private void writeDictionary(Dataset<Row> intermediateTable, String cubeName, String segmentId) throws Exception { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); - final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); - final String[] columns = intermediateTable.columns(); - final CubeSegment seg = cubeInstance.getSegmentById(segmentId); - final CubeDesc cubeDesc = cubeInstance.getDescriptor(); - final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap(); - final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc); - final List<TblColRef> baseCuboidColumn = Cuboid.findById(seg, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); - final long start = System.currentTimeMillis(); - final RowKeyDesc rowKey = cubeDesc.getRowkey(); - for (int i = 0; i < baseCuboidColumn.size(); i++) { - TblColRef col = baseCuboidColumn.get(i); - if (!rowKey.isUseDictionary(col)) { - continue; - } - final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i]; - tblColRefMap.put(rowKeyColumnIndex, col); - } - - Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); - for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) { - final String column = columns[entry.getKey()]; - final TblColRef tblColRef = entry.getValue(); - final Dataset<Row> frame = intermediateTable.select(column).distinct(); - - final List<Row> rows = frame.collectAsList(); - dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() { - @Override - public Iterator<String> iterator() { - return new Iterator<String>() { - int i = 0; - - @Override - public boolean hasNext() { - return i < rows.size(); - } - - @Override - public String next() { - if (hasNext()) { - final Row row = rows.get(i++); - final Object o = row.get(0); - return o != null ? o.toString() : null; - } else { - throw new NoSuchElementException(); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }))); - } - final long end = System.currentTimeMillis(); - CubingUtils.writeDictionary(seg, dictionaryMap, start, end); - try { - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToUpdateSegs(seg); - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); - } - } - - private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { - CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); - CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - CubeDesc cubeDesc = cubeInstance.getDescriptor(); - CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler(); - Set<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); - final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap(); - for (Long id : allCuboidIds) { - zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); - } - - CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - - final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes(); - final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; - final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); - final byte[][] row_hashcodes = new byte[nRowKey][]; - - for (Long cuboidId : allCuboidIds) { - Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)]; - - long mask = Long.highestOneBit(baseCuboidId); - int position = 0; - for (int i = 0; i < nRowKey; i++) { - if ((mask & cuboidId) > 0) { - cuboidBitSet[position] = i; - position++; - } - mask = mask >> 1; - } - allCuboidsBitSet.put(cuboidId, cuboidBitSet); - } - - final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() { - - final HashFunction hashFunction = Hashing.murmur3_128(); - - @Override - public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception { - for (int i = 0; i < nRowKey; i++) { - Hasher hc = hashFunction.newHasher(); - String colValue = v2.get(rowKeyColumnIndexes[i]); - if (colValue != null) { - row_hashcodes[i] = hc.putString(colValue).hash().asBytes(); - } else { - row_hashcodes[i] = hc.putInt(0).hash().asBytes(); - } - } - - for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) { - Hasher hc = hashFunction.newHasher(); - HLLCounter counter = v1.get(entry.getKey()); - final Integer[] cuboidBitSet = entry.getValue(); - for (int position = 0; position < cuboidBitSet.length; position++) { - hc.putBytes(row_hashcodes[cuboidBitSet[position]]); - } - counter.add(hc.hash().asBytes()); - } - return v1; - } - }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() { - @Override - public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception { - Preconditions.checkArgument(v1.size() == v2.size()); - Preconditions.checkArgument(v1.size() > 0); - for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) { - final HLLCounter counter1 = entry.getValue(); - final HLLCounter counter2 = v2.get(entry.getKey()); - counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null")); - } - return v1; - } - - }); - return samplingResult; - } - - /** return hfile location */ - private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception { - CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); - CubeDesc cubeDesc = cubeInstance.getDescriptor(); - final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeSegment, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); - final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap(); - final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap(); - for (TblColRef tblColRef : baseCuboidColumn) { - columnLengthMap.put(tblColRef, dimEncMap.get(tblColRef).getLengthOfEncoding()); - } - final Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); - for (DimensionDesc dim : cubeDesc.getDimensions()) { - // dictionary - for (TblColRef col : dim.getColumnRefs()) { - if (cubeDesc.getRowkey().isUseDictionary(col)) { - Dictionary<String> dict = cubeSegment.getDictionary(col); - if (dict == null) { - System.err.println("Dictionary for " + col + " was not found."); - continue; - } - dictionaryMap.put(col, dict); - System.out.println("col:" + col + " dictionary size:" + dict.getSize()); - } - } - } - - for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { - FunctionDesc func = measureDesc.getFunction(); - List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func); - for (TblColRef col : colRefs) { - dictionaryMap.put(col, cubeSegment.getDictionary(col)); - } - } - - final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() { - - @Override - public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception { - long t = System.currentTimeMillis(); - prepare(); - - final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); - - LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); - System.out.println("load properties finished"); - IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); - AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder( - cubeSegment.getCuboidScheduler(), flatDesc, dictionaryMap); - final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); - Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); - try { - while (listIterator.hasNext()) { - for (List<String> row : listIterator.next()) { - blockingQueue.put(row); - } - } - blockingQueue.put(Collections.<String> emptyList()); - } catch (Exception e) { - throw new RuntimeException(e); - } - System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms"); - return sparkCuboidWriter.getResult().iterator(); - } - }); - - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier()); - Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString()); - Preconditions.checkArgument(!FileSystem.get(conf).exists(path)); - String url = conf.get("fs.defaultFS") + path.toString(); - System.out.println("use " + url + " as hfile"); - List<MeasureDesc> measuresDescs = cubeDesc.getMeasures(); - final int measureSize = measuresDescs.size(); - final String[] dataTypes = new String[measureSize]; - for (int i = 0; i < dataTypes.length; i++) { - dataTypes[i] = measuresDescs.get(i).getFunction().getReturnType(); - } - final MeasureAggregators aggs = new MeasureAggregators(measuresDescs); - writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url); - return url; - } - - private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) { - javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() { - @Override - public int numPartitions() { - return splitKeys.length + 1; - } - - @Override - public int getPartition(Object key) { - Preconditions.checkArgument(key instanceof byte[]); - for (int i = 0, n = splitKeys.length; i < n; ++i) { - if (UnsignedBytes.lexicographicalComparator().compare((byte[]) key, splitKeys[i]) < 0) { - return i; - } - } - return splitKeys.length; - } - }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() { - @Override - public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception { - return new Iterable<Tuple2<byte[], byte[]>>() { - final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes); - final Object[] input = new Object[measureSize]; - final Object[] result = new Object[measureSize]; - - @Override - public Iterator<Tuple2<byte[], byte[]>> iterator() { - return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() { - @Override - public byte[] call(Iterable<byte[]> v1) throws Exception { - final LinkedList<byte[]> list = Lists.newLinkedList(v1); - if (list.size() == 1) { - return list.get(0); - } - aggs.reset(); - for (byte[] v : list) { - codec.decode(ByteBuffer.wrap(v), input); - aggs.aggregate(input); - } - aggs.collectStates(result); - ByteBuffer buffer = codec.encode(result); - byte[] bytes = new byte[buffer.position()]; - System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position()); - return bytes; - } - }); - } - }.iterator(); - } - }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() { - @Override - public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception { - ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1()); - KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2()); - return new Tuple2(key, value); - } - }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf); - } - - public static void prepare() throws Exception { - final File file = new File(SparkFiles.get("kylin.properties")); - final String confPath = file.getParentFile().getAbsolutePath(); - System.out.println("conf directory:" + confPath); - System.setProperty(KylinConfig.KYLIN_CONF, confPath); - ClassUtil.addClasspath(confPath); - } - - private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100); - final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap); - System.out.println("cube size estimation:" + cubeSizeMap); - final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder' - CubeHTableUtil.createHTable(cubeSegment, splitKeys); - System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created"); - return splitKeys; - } - - private Configuration getConfigurationForHFile(String hTableName) throws IOException { - final Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); - Job job = Job.getInstance(conf); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - HTable table = new HTable(conf, hTableName); - HFileOutputFormat.configureIncrementalLoad(job, table); - return conf; - } - - private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); - final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); - - FsShell shell = new FsShell(hbaseConf); - try { - shell.run(new String[] { "-chmod", "-R", "777", hfileLocation }); - } catch (Exception e) { - logger.error("Couldnt change the file permissions ", e); - throw new IOException(e); - } - - String[] newArgs = new String[2]; - newArgs[0] = hfileLocation; - newArgs[1] = cubeSegment.getStorageLocationIdentifier(); - - int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), newArgs); - System.out.println("incremental load result:" + ret); - - cubeSegment.setStatus(SegmentStatusEnum.READY); - try { - CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeInstance.setStatus(RealizationStatusEnum.READY); - cubeSegment.setStatus(SegmentStatusEnum.READY); - cubeBuilder.setToUpdateSegs(cubeSegment); - CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder); - } catch (IOException e) { - throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); - } - } - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); - SparkConf conf = new SparkConf().setAppName("Simple Application"); - //memory conf - conf.set("spark.executor.memory", "6g"); - conf.set("spark.storage.memoryFraction", "0.3"); - - //serialization conf - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); - conf.set("spark.kryo.registrationRequired", "true"); - - JavaSparkContext sc = new JavaSparkContext(conf); - HiveContext sqlContext = new HiveContext(sc.sc()); - final Dataset<Row> intermediateTable = sqlContext.sql("select * from " + hiveTable); - final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); - final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); - final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); - final String coprocessor = optionsHelper.getOptionValue(OPTION_COPROCESSOR); - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - kylinConfig.overrideCoprocessorLocalJar(coprocessor); - - setupClasspath(sc, confPath); - intermediateTable.cache(); - writeDictionary(intermediateTable, cubeName, segmentId); - final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() { - @Override - public List<String> call(Row v1) throws Exception { - ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size()); - for (int i = 0; i < v1.size(); i++) { - final Object o = v1.get(i); - if (o != null) { - result.add(o.toString()); - } else { - result.add(null); - } - } - return result; - - } - }); - - final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); - final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); - - final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys); - bulkLoadHFile(cubeName, segmentId, hfile); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java deleted file mode 100644 index 76e4521..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.engine.mr.IMROutput2; -import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class SparkCubingJobBuilder extends JobBuilderSupport { - private static final Logger logger = LoggerFactory.getLogger(SparkCubingJobBuilder.class); - - private final IMRInput.IMRBatchCubingInputSide inputSide; - private final IMROutput2.IMRBatchCubingOutputSide2 outputSide; - private final String confPath; - private final String coprocessor; - - public SparkCubingJobBuilder(CubeSegment seg, String submitter, String confPath, String coprocessor) { - super(seg, submitter); - this.inputSide = MRUtil.getBatchCubingInputSide(seg); - this.outputSide = MRUtil.getBatchCubingOutputSide2(seg); - this.confPath = confPath; - this.coprocessor = coprocessor; - } - - public DefaultChainedExecutable build() { - final CubingJob result = CubingJob.createBuildJob(seg, submitter, config); - - inputSide.addStepPhase1_CreateFlatTable(result); - final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); - final String tableName = joinedFlatTableDesc.getTableName(); - logger.info("intermediate table:" + tableName); - - final SparkExecutable sparkExecutable = new SparkExecutable(); - sparkExecutable.setClassName(SparkCubing.class.getName()); - sparkExecutable.setParam("hiveTable", tableName); - sparkExecutable.setParam(CubingExecutableUtil.CUBE_NAME, seg.getRealization().getName()); - sparkExecutable.setParam("segmentId", seg.getUuid()); - sparkExecutable.setParam("confPath", confPath); - sparkExecutable.setParam("coprocessor", coprocessor); - result.addTask(sparkExecutable); - return result; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java deleted file mode 100644 index 77ebe69..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; - -import scala.Tuple2; - -/** - */ -public interface SparkCuboidWriter extends ICuboidWriter { - - Iterable<Tuple2<byte[], byte[]>> getResult(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java deleted file mode 100644 index 4eda50e..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import org.apache.commons.cli.Options; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.OptionsHelper; - -/** - */ -public class SparkHelloWorld extends AbstractApplication { - - @Override - protected Options getOptions() { - return new Options(); - } - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - System.out.println("hello kylin-spark"); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java deleted file mode 100644 index 58d4222..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark; - -import org.apache.commons.cli.Options; -import org.apache.kylin.common.util.AbstractApplication; -import org.apache.kylin.common.util.OptionsHelper; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.hive.HiveContext; - -/** - */ -public class SparkHiveDemo extends AbstractApplication { - - private final Options options; - - public SparkHiveDemo() { - options = new Options(); - } - - @Override - protected Options getOptions() { - return options; - } - - @Override - protected void execute(OptionsHelper optionsHelper) throws Exception { - SparkConf conf = new SparkConf().setAppName("Simple Application"); - JavaSparkContext sc = new JavaSparkContext(conf); - HiveContext sqlContext = new HiveContext(sc.sc()); - final Dataset<Row> dataFrame = sqlContext.sql("select * from test_kylin_fact"); - System.out.println("count * of the table:" + dataFrame.count()); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java deleted file mode 100644 index b3334b7..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark.cube; - -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.kylin.engine.spark.SparkCuboidWriter; -import org.apache.kylin.gridtable.GTRecord; - -import scala.Tuple2; - -/** - */ -public class BufferedCuboidWriter implements SparkCuboidWriter { - - private final LinkedBlockingQueue<Tuple2<byte[], byte[]>> blockingQueue; - private final TupleConverter tupleConverter; - - public BufferedCuboidWriter(TupleConverter tupleConverter) { - this.blockingQueue = new LinkedBlockingQueue<>(10000); - this.tupleConverter = tupleConverter; - } - - @Override - public void write(final long cuboidId, final GTRecord record) throws IOException { - try { - blockingQueue.put(tupleConverter.convert(cuboidId, record)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - @Override - public void flush() { - } - - @Override - public void close() { - try { - blockingQueue.put(new Tuple2(new byte[0], new byte[0])); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - - @Override - public Iterable<Tuple2<byte[], byte[]>> getResult() { - return new Iterable<Tuple2<byte[], byte[]>>() { - @Override - public Iterator<Tuple2<byte[], byte[]>> iterator() { - return new Iterator<Tuple2<byte[], byte[]>>() { - Tuple2<byte[], byte[]> current = null; - - @Override - public boolean hasNext() { - if (current == null) { - try { - current = blockingQueue.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - return current._1().length > 0 && current._2().length > 0; - } - - @Override - public Tuple2<byte[], byte[]> next() { - if (hasNext()) { - Tuple2<byte[], byte[]> result = current; - current = null; - return result; - } else { - throw new NoSuchElementException(); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java deleted file mode 100644 index d7b20c8..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark.cube; - -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.kv.RowKeyEncoder; -import org.apache.kylin.cube.kv.RowKeyEncoderProvider; -import org.apache.kylin.gridtable.GTRecord; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.metadata.model.TblColRef; - -import scala.Tuple2; - -/** - */ -public final class DefaultTupleConverter implements TupleConverter { - - private final static transient ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>(); - private final CubeSegment segment; - private final int measureCount; - private final Map<TblColRef, Integer> columnLengthMap; - private RowKeyEncoderProvider rowKeyEncoderProvider; - private byte[] rowKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; - - public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) { - this.segment = segment; - this.measureCount = segment.getCubeDesc().getMeasures().size(); - this.columnLengthMap = columnLengthMap; - this.rowKeyEncoderProvider = new RowKeyEncoderProvider(this.segment); - } - - private ByteBuffer getValueBuf() { - if (valueBuf.get() == null) { - valueBuf.set(ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE)); - } - return valueBuf.get(); - } - - private void setValueBuf(ByteBuffer buf) { - valueBuf.set(buf); - } - - @Override - public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) { - Cuboid cuboid = Cuboid.findById(segment, cuboidId); - RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); - - final int dimensions = Long.bitCount(cuboidId); - final ImmutableBitSet measureColumns = new ImmutableBitSet(dimensions, dimensions + measureCount); - - int offSet = 0; - for (int x = 0; x < dimensions; x++) { - final ByteArray byteArray = record.get(x); - System.arraycopy(byteArray.array(), byteArray.offset(), rowKeyBodyBuf, offSet, byteArray.length()); - offSet += byteArray.length(); - } - - byte[] rowKey = rowkeyEncoder.createBuf(); - rowkeyEncoder.encode(new ByteArray(rowKeyBodyBuf, 0, offSet), new ByteArray(rowKey)); - - ByteBuffer valueBuf = getValueBuf(); - valueBuf.clear(); - try { - record.exportColumns(measureColumns, valueBuf); - } catch (BufferOverflowException boe) { - valueBuf = ByteBuffer.allocate((int) (record.sizeOf(measureColumns) * 1.5)); - record.exportColumns(measureColumns, valueBuf); - setValueBuf(valueBuf); - } - - byte[] value = new byte[valueBuf.position()]; - System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position()); - return new Tuple2<>(rowKey, value); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java deleted file mode 100644 index a2740bf..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark.cube; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.kylin.engine.spark.SparkCuboidWriter; -import org.apache.kylin.gridtable.GTRecord; - -import scala.Tuple2; - -/** - */ -public class ListBackedCuboidWriter implements SparkCuboidWriter { - - private final ArrayList<Tuple2<byte[], byte[]>> result; - private final TupleConverter tupleConverter; - - public ListBackedCuboidWriter(TupleConverter tupleConverter) { - this.result = new ArrayList(); - this.tupleConverter = tupleConverter; - } - - @Override - public void write(long cuboidId, GTRecord record) throws IOException { - result.add(tupleConverter.convert(cuboidId, record)); - } - - @Override - public void flush() { - - } - - @Override - public void close() { - - } - - @Override - public Iterable<Tuple2<byte[], byte[]>> getResult() { - return result; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java deleted file mode 100644 index 3dbdc05..0000000 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark.cube; - -import org.apache.kylin.gridtable.GTRecord; - -import scala.Tuple2; - -/** - */ -public interface TupleConverter { - - Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java deleted file mode 100644 index 8afea55..0000000 --- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.engine.spark.cube; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.kylin.gridtable.GTRecord; -import org.junit.Test; - -import scala.Tuple2; - -/** - */ -public class BufferedCuboidWriterTest { - - @Test - public void test() throws ExecutionException, InterruptedException { - final BufferedCuboidWriter bufferedCuboidWriter = new BufferedCuboidWriter(new TupleConverter() { - @Override - public Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) { - return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), Long.valueOf(cuboidId).toString().getBytes()); - } - }); - final int testCount = 10000000; - final Future<?> future = Executors.newCachedThreadPool().submit(new Runnable() { - @Override - public void run() { - int i = 0; - - while (i++ < testCount) { - try { - bufferedCuboidWriter.write(i, null); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - bufferedCuboidWriter.close(); - } - }); - long actualCount = 0; - for (Tuple2<byte[], byte[]> tuple2 : bufferedCuboidWriter.getResult()) { - assertEquals(Long.parseLong(new String(tuple2._1())), ++actualCount); - } - future.get(); - assertEquals(actualCount, testCount); - - } -}
