Repository: kylin Updated Branches: refs/heads/1.4-rc 303f21761 -> 08b2051f8
KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure Conflicts: engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/809fc627 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/809fc627 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/809fc627 Branch: refs/heads/1.4-rc Commit: 809fc6272b51fc82cc246ca8dec2644242645d83 Parents: fbed054 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Feb 2 17:34:46 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Feb 14 11:33:35 2016 +0800 ---------------------------------------------------------------------- .../cube/inmemcubing/CompoundCuboidWriter.java | 57 ++++++++++++++ .../kylin/cube/inmemcubing/ICuboidWriter.java | 4 +- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/steps/KVGTRecordWriter.java | 81 ++++++++++++++++++++ .../mr/steps/MapContextGTRecordWriter.java | 69 ++--------------- .../streaming/cube/StreamingCubeBuilder.java | 12 ++- .../storage/hbase/steps/HBaseCuboidWriter.java | 24 +++--- .../hbase/steps/HBaseMROutput2Transition.java | 2 +- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 2 +- .../hbase/steps/HBaseStreamingOutput.java | 8 +- .../hbase/steps/SequenceFileCuboidWriter.java | 75 ++++++++++++++++++ 11 files changed, 254 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java new file mode 100644 index 0000000..46eef50 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.inmemcubing; + +import org.apache.kylin.gridtable.GTRecord; + +import java.io.IOException; + +/** + */ +public class CompoundCuboidWriter implements ICuboidWriter { + + private Iterable<ICuboidWriter> cuboidWriters; + + public CompoundCuboidWriter(Iterable<ICuboidWriter> cuboidWriters) { + this.cuboidWriters = cuboidWriters; + + } + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + for (ICuboidWriter writer : cuboidWriters) { + writer.write(cuboidId, record); + } + } + + @Override + public void flush() throws IOException { + for (ICuboidWriter writer : cuboidWriters) { + writer.flush(); + } + + } + + @Override + public void close() throws IOException { + for (ICuboidWriter writer : cuboidWriters) { + writer.close(); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java index 9e26e5e..e6cfa02 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java @@ -27,7 +27,7 @@ public interface ICuboidWriter { void write(long cuboidId, GTRecord record) throws IOException; - void flush(); + void flush() throws IOException; - void close(); + void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index ba50880..d370b0d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -56,6 +56,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data"; public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection"; + public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS"; public static final String STEP_NAME_BUILD_II = "Build Inverted Index"; public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile"; http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java new file mode 100644 index 0000000..e201705 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java @@ -0,0 +1,81 @@ +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.inmemcubing.ICuboidWriter; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.ByteArrayWritable; +import org.apache.kylin.gridtable.GTRecord; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + */ +public abstract class KVGTRecordWriter implements ICuboidWriter { + + private static final Log logger = LogFactory.getLog(KVGTRecordWriter.class); + private Long lastCuboidId; + protected CubeSegment cubeSegment; + protected CubeDesc cubeDesc; + + private AbstractRowKeyEncoder rowKeyEncoder; + private int dimensions; + private int measureCount; + private byte[] keyBuf; + private int[] measureColumnsIndex; + private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); + private ByteArrayWritable outputKey = new ByteArrayWritable(); + private ByteArrayWritable outputValue = new ByteArrayWritable(); + private long cuboidRowCount = 0; + + //for shard + + public KVGTRecordWriter(CubeDesc cubeDesc, CubeSegment cubeSegment) { + this.cubeDesc = cubeDesc; + this.cubeSegment = cubeSegment; + this.measureCount = cubeDesc.getMeasures().size(); + } + + @Override + public void write(long cuboidId, GTRecord record) throws IOException { + + if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) { + if (lastCuboidId != null) { + logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows"); + cuboidRowCount = 0; + } + // output another cuboid + initVariables(cuboidId); + lastCuboidId = cuboidId; + } + + cuboidRowCount++; + rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf); + + //output measures + valueBuf.clear(); + record.exportColumns(measureColumnsIndex, valueBuf); + + outputKey.set(keyBuf, 0, keyBuf.length); + outputValue.set(valueBuf.array(), 0, valueBuf.position()); + writeAsKeyValue(outputKey, outputValue); + } + + protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException; + + private void initVariables(Long cuboidId) { + rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId)); + keyBuf = rowKeyEncoder.createBuf(); + + dimensions = Long.bitCount(cuboidId); + measureColumnsIndex = new int[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureColumnsIndex[i] = dimensions + i; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index 1aa2b2e..9f93816 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -18,77 +18,33 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.BitSet; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.MapContext; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.inmemcubing.ICuboidWriter; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.ByteArrayWritable; -import org.apache.kylin.gridtable.GTRecord; + +import java.io.IOException; /** */ -public class MapContextGTRecordWriter implements ICuboidWriter { +public class MapContextGTRecordWriter extends KVGTRecordWriter { private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class); protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext; - private Long lastCuboidId; - protected CubeSegment cubeSegment; - protected CubeDesc cubeDesc; - - private AbstractRowKeyEncoder rowKeyEncoder; - private int dimensions; - private int measureCount; - private byte[] keyBuf; - private int[] measureColumnsIndex; - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - private ByteArrayWritable outputKey = new ByteArrayWritable(); - private ByteArrayWritable outputValue = new ByteArrayWritable(); - private long cuboidRowCount = 0; - - //for shard public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) { + super(cubeDesc, cubeSegment); this.mapContext = mapContext; - this.cubeDesc = cubeDesc; - this.cubeSegment = cubeSegment; - this.measureCount = cubeDesc.getMeasures().size(); } @Override - public void write(long cuboidId, GTRecord record) throws IOException { - - if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) { - if (lastCuboidId != null) { - logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows"); - cuboidRowCount = 0; - } - // output another cuboid - initVariables(cuboidId); - lastCuboidId = cuboidId; - } - - cuboidRowCount++; - rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf); - - //output measures - valueBuf.clear(); - record.exportColumns(measureColumnsIndex, valueBuf); - - outputKey.set(keyBuf, 0, keyBuf.length); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); + protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException { try { - mapContext.write(outputKey, outputValue); + mapContext.write(key, value); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw new IOException(e); } } @@ -101,15 +57,4 @@ public class MapContextGTRecordWriter implements ICuboidWriter { public void close() { } - - private void initVariables(Long cuboidId) { - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc, cuboidId)); - keyBuf = rowKeyEncoder.createBuf(); - - dimensions = BitSet.valueOf(new long[] { cuboidId }).cardinality(); - measureColumnsIndex = new int[measureCount]; - for (int i = 0; i < measureCount; i++) { - measureColumnsIndex[i] = dimensions + i; - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java index ae72218..e285ee8 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java @@ -98,6 +98,14 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException("error build cube from StreamingBatch", e.getCause()); + } catch (IOException e) { + throw new RuntimeException("error build cube from StreamingBatch", e.getCause()); + } finally { + try { + cuboidWriter.close(); + } catch (IOException e) { + throw new RuntimeException("error build cube from StreamingBatch", e.getCause()); + } } } @@ -106,7 +114,9 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); try { - return cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false); + CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(), streamingBatch.getTimeRange().getSecond(), false, false); + segment.setLastBuildJobID(segment.getUuid()); // give a fake job id + return segment; } catch (IOException e) { throw new RuntimeException("failed to create IBuildable", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index 31cce7b..29d646f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -33,9 +33,8 @@ */ package org.apache.kylin.storage.hbase.steps; -import java.io.IOException; -import java.util.List; - +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; @@ -51,13 +50,14 @@ import org.apache.kylin.gridtable.GTRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; /** */ -public final class HBaseCuboidWriter implements ICuboidWriter { +public class HBaseCuboidWriter implements ICuboidWriter { - private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class); + private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class); private static final int BATCH_PUT_THRESHOLD = 10000; @@ -125,8 +125,8 @@ public final class HBaseCuboidWriter implements ICuboidWriter { } } - public final void flush() { - try { + @Override + public final void flush() throws IOException { if (!puts.isEmpty()) { long t = System.currentTimeMillis(); if (hTable != null) { @@ -136,14 +136,12 @@ public final class HBaseCuboidWriter implements ICuboidWriter { logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); } - } catch (IOException e) { - throw new RuntimeException(e); - } } @Override - public void close() { - + public void close() throws IOException { + flush(); + IOUtils.closeQuietly(hTable); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index 9036cf4..bb83901 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -80,7 +80,7 @@ public class HBaseMROutput2Transition implements IMROutput2 { @Override public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(steps.createMergeGCStep()); + steps.addMergingGarbageCollectionSteps(jobFlow); } }; } http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 1ab14b4..62a8130 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -179,7 +179,7 @@ public class HBaseMRSteps extends JobBuilderSupport { toDeletePaths.addAll(getMergingHDFSPaths()); HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); - step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); + step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); step.setDeletePaths(toDeletePaths); step.setJobId(jobId); http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index fdab8eb..19873c9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -18,9 +18,11 @@ package org.apache.kylin.storage.hbase.steps; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.UUID; +import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -31,6 +33,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.hll.HyperLogLogPlusCounter; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.BatchConstants; @@ -54,7 +57,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { final HTableInterface hTable; hTable = createHTable(cubeSegment); - return new HBaseCuboidWriter(cubeSegment, hTable); + List<ICuboidWriter> cuboidWriters = Lists.newArrayList(); + cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); + cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment)); + return new CompoundCuboidWriter(cuboidWriters); } catch (IOException e) { throw new RuntimeException("failed to get ICuboidWriter", e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/809fc627/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java new file mode 100644 index 0000000..4d76522 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java @@ -0,0 +1,75 @@ +package org.apache.kylin.storage.hbase.steps; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.ByteArrayWritable; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.steps.KVGTRecordWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + */ +public class SequenceFileCuboidWriter extends KVGTRecordWriter { + + private static final Logger logger = LoggerFactory.getLogger(SequenceFileCuboidWriter.class); + private SequenceFile.Writer writer = null; + + public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) { + super(cubeDesc, segment); + } + + + @Override + protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException { + if (writer == null) { + synchronized (SequenceFileCuboidWriter.class) { + if (writer == null) { + JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "SYSTEM"); + String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment); + Path cuboidPath = new Path(cuboidRoot); + FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot); + try { + if (fs.exists(cuboidPath)) { + fs.delete(cuboidPath, true); + } + + fs.mkdirs(cuboidPath); + } finally { + IOUtils.closeQuietly(fs); + } + + Path cuboidFile = new Path(cuboidPath, "data.seq"); + logger.debug("Cuboid is written to " + cuboidFile); + writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(), SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); + } + } + } + + Text outputValue = new Text(); + Text outputKey = new Text(); + outputKey.set(key.array(), key.offset(), key.length()); + outputValue.set(value.array(), value.offset(), value.length()); + writer.append(outputKey, outputValue); + } + + @Override + public void flush() throws IOException { + if (writer != null) { + writer.hflush(); + } + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(writer); + } +}