Repository: kylin Updated Branches: refs/heads/KYLIN-2122 3cd68478b -> aab17b64c (forced update)
KYLIN-2122 Move the partition offset calculation before submitting job Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aab17b64 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aab17b64 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aab17b64 Branch: refs/heads/KYLIN-2122 Commit: aab17b64cbfb823a863deac2891f1c10c6a2d47e Parents: 8172d0b Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Oct 25 11:54:22 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Nov 2 10:56:30 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/cube/CubeInstance.java | 9 ++ .../java/org/apache/kylin/cube/CubeManager.java | 103 +++++------------- .../org/apache/kylin/cube/CubeManagerTest.java | 2 +- .../org/apache/kylin/cube/CubeSegmentsTest.java | 2 +- .../kylin/job/constant/ExecutableConstants.java | 1 + .../java/org/apache/kylin/source/ISource.java | 13 ++- .../org/apache/kylin/source/SourceFactory.java | 2 +- .../apache/kylin/source/SourcePartition.java | 103 ++++++++++++++++++ .../kylin/engine/mr/JobBuilderSupport.java | 2 + .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 67 ++++++++++++ .../kylin/provision/BuildCubeWithEngine.java | 8 +- .../kylin/provision/BuildCubeWithStream.java | 17 ++- .../apache/kylin/rest/service/JobService.java | 16 +-- .../apache/kylin/source/hive/HiveSource.java | 32 +++++- .../apache/kylin/source/kafka/KafkaMRInput.java | 90 ++++++++++------ .../apache/kylin/source/kafka/KafkaSource.java | 105 +++++++++++++++++- .../kylin/source/kafka/config/KafkaConfig.java | 3 + .../source/kafka/hadoop/KafkaInputFormat.java | 41 ++++--- .../kylin/source/kafka/job/SeekOffsetStep.java | 106 +------------------ .../source/kafka/job/UpdateTimeRangeStep.java | 78 +------------- .../kylin/source/kafka/util/KafkaClient.java | 24 ++++- 21 files changed, 483 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 720690d..7222457 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -439,6 +439,15 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0; } + public CubeSegment getLastSegment() { + List<CubeSegment> existing = getSegments(); + if (existing.isEmpty()) { + return null; + } else { + return existing.get(existing.size() - 1); + } + } + @Override public int getSourceType() { return getFactTableDesc().getSourceType(); http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index a53849e..16b468f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -34,8 +34,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; @@ -68,9 +66,11 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourcePartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -434,52 +434,20 @@ public class CubeManager implements IRealizationProvider { // append a full build segment public CubeSegment appendSegment(CubeInstance cube) throws IOException { - return appendSegment(cube, 0, 0, 0, 0, null, null); + return appendSegment(cube, 0, Long.MAX_VALUE, 0, 0, null, null); } public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate) throws IOException { return appendSegment(cube, startDate, endDate, 0, 0, null, null); } + public CubeSegment appendSegment(CubeInstance cube, SourcePartition sourcePartition) throws IOException { + return appendSegment(cube, sourcePartition.getStartDate(), sourcePartition.getEndDate(), sourcePartition.getStartOffset(), sourcePartition.getEndOffset(), sourcePartition.getSourcePartitionOffsetStart(), sourcePartition.getSourcePartitionOffsetEnd()); + } + public CubeSegment appendSegment(CubeInstance cube, long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) throws IOException { checkBuildingSegment(cube); - if (sourcePartitionOffsetStart == null) { - sourcePartitionOffsetStart = Maps.newHashMap(); - } - if (sourcePartitionOffsetEnd == null) { - sourcePartitionOffsetEnd = Maps.newHashMap(); - } - - boolean isOffsetsOn = endOffset != 0; - if (isOffsetsOn == true) { - checkSourceOffsets(startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); - } - - if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) { - // try figure out a reasonable start if missing - if (startDate == 0 && startOffset == 0) { - final CubeSegment last = getLatestSegment(cube); - if (last != null) { - if (isOffsetsOn) { - if (last.getSourceOffsetEnd() == Long.MAX_VALUE) { - throw new IllegalStateException("There is already one pending for building segment, please submit request later."); - } - startOffset = last.getSourceOffsetEnd(); - sourcePartitionOffsetStart = last.getSourcePartitionOffsetEnd(); - } else { - startDate = last.getDateRangeEnd(); - } - } - } - - } else { - startDate = 0; - endDate = Long.MAX_VALUE; - startOffset = 0; - endOffset = 0; - } - CubeSegment newSegment = newSegment(cube, startDate, endDate, startOffset, endOffset); newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart); newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd); @@ -638,7 +606,7 @@ public class CubeManager implements IRealizationProvider { return max; } - private CubeSegment getLatestSegment(CubeInstance cube) { + public CubeSegment getLatestSegment(CubeInstance cube) { List<CubeSegment> existing = cube.getSegments(); if (existing.isEmpty()) { return null; @@ -647,49 +615,28 @@ public class CubeManager implements IRealizationProvider { } } - private void checkBuildingSegment(CubeInstance cube) { - int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); - if (cube.getBuildingSegments().size() >= maxBuldingSeg) { - throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; "); + private long calculateStartOffsetForAppendSegment(CubeInstance cube) { + List<CubeSegment> existing = cube.getSegments(); + if (existing.isEmpty()) { + return 0; + } else { + return existing.get(existing.size() - 1).getSourceOffsetEnd(); } } - private void checkSourceOffsets(long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) { - if (endOffset <= 0) - return; - - if (startOffset >= endOffset) { - throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'"); - } - - if (startOffset > 0) { - if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) { - throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset"); - } - - long totalOffset = 0; - for (Long v : sourcePartitionOffsetStart.values()) { - totalOffset += v; - } - - if (totalOffset != startOffset) { - throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'"); - } + private long calculateStartDateForAppendSegment(CubeInstance cube) { + List<CubeSegment> existing = cube.getSegments(); + if (existing.isEmpty()) { + return cube.getDescriptor().getPartitionDateStart(); + } else { + return existing.get(existing.size() - 1).getDateRangeEnd(); } + } - if (endOffset > 0 && endOffset != Long.MAX_VALUE) { - if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) { - throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset"); - } - - long totalOffset = 0; - for (Long v : sourcePartitionOffsetEnd.values()) { - totalOffset += v; - } - - if (totalOffset != endOffset) { - throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'"); - } + private void checkBuildingSegment(CubeInstance cube) { + int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments(); + if (cube.getBuildingSegments().size() >= maxBuldingSeg) { + throw new IllegalStateException("There is already " + cube.getBuildingSegments().size() + " building segment; "); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java index bb90d29..2904eb2 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java @@ -111,7 +111,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase { CubeSegment seg1 = mgr.appendSegment(cube, 0, 1000, 0, 0, null, null); seg1.setStatus(SegmentStatusEnum.READY); - CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000, 0, 0, null, null); + CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000, 0, 0, null, null); seg2.setStatus(SegmentStatusEnum.READY); CubeUpdate cubeBuilder = new CubeUpdate(cube); http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java index 828a3a9..a5bd821 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java @@ -110,7 +110,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase { seg1.setStatus(SegmentStatusEnum.READY); // append second - CubeSegment seg2 = mgr.appendSegment(cube, 0, 2000); + CubeSegment seg2 = mgr.appendSegment(cube, 1000, 2000); assertEquals(2, cube.getSegments().size()); assertEquals(1000, seg2.getDateRangeStart()); http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index ad0b1b1..cec2e5d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -52,6 +52,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data"; public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; public static final String STEP_NAME_HIVE_CLEANUP = "Hive Cleanup"; + public static final String STEP_NAME_KAFKA_CLEANUP = "Kafka Intermediate File Cleanup"; public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection"; public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS"; public static final String STEP_NAME_BUILD_II = "Build Inverted Index"; http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-metadata/src/main/java/org/apache/kylin/source/ISource.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index e9216f9..5bff8a7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -18,15 +18,18 @@ package org.apache.kylin.source; -import org.apache.kylin.metadata.model.TableDesc; - import java.util.List; +import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.TableDesc; + public interface ISource { - public <I> I adaptToBuildEngine(Class<I> engineInterface); + <I> I adaptToBuildEngine(Class<I> engineInterface); + + ReadableTable createReadableTable(TableDesc tableDesc); - public ReadableTable createReadableTable(TableDesc tableDesc); + List<String> getMRDependentResources(TableDesc table); - public List<String> getMRDependentResources(TableDesc table); + SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java index e82c6ed..5ce9014 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourceFactory.java @@ -31,7 +31,7 @@ public class SourceFactory { private static ImplementationSwitch<ISource> sources; static { Map<Integer, String> impls = KylinConfig.getInstanceFromEnv().getSourceEngines(); - sources = new ImplementationSwitch<ISource>(impls, ISource.class); + sources = new ImplementationSwitch<>(impls, ISource.class); } public static ISource tableSource(ISourceAware aware) { http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java new file mode 100644 index 0000000..8ba749d --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source; + +import java.util.Map; + +/** + */ +public class SourcePartition { + long startDate; + long endDate; + long startOffset; + long endOffset; + Map<Integer, Long> sourcePartitionOffsetStart; + Map<Integer, Long> sourcePartitionOffsetEnd; + + public SourcePartition() { + } + + public SourcePartition(long startDate, long endDate, long startOffset, long endOffset, Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd) { + this.startDate = startDate; + this.endDate = endDate; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.sourcePartitionOffsetStart = sourcePartitionOffsetStart; + this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; + } + + public long getStartDate() { + return startDate; + } + + public void setStartDate(long startDate) { + this.startDate = startDate; + } + + public long getEndDate() { + return endDate; + } + + public void setEndDate(long endDate) { + this.endDate = endDate; + } + + public long getStartOffset() { + return startOffset; + } + + public void setStartOffset(long startOffset) { + this.startOffset = startOffset; + } + + public long getEndOffset() { + return endOffset; + } + + public void setEndOffset(long endOffset) { + this.endOffset = endOffset; + } + + public Map<Integer, Long> getSourcePartitionOffsetStart() { + return sourcePartitionOffsetStart; + } + + public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) { + this.sourcePartitionOffsetStart = sourcePartitionOffsetStart; + } + + public Map<Integer, Long> getSourcePartitionOffsetEnd() { + return sourcePartitionOffsetEnd; + } + + public void setSourcePartitionOffsetEnd(Map<Integer, Long> sourcePartitionOffsetEnd) { + this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; + } + + public static SourcePartition getCopyOf(SourcePartition origin) { + SourcePartition copy = new SourcePartition(); + copy.setStartDate(origin.getStartDate()); + copy.setEndDate(origin.getEndDate()); + copy.setStartOffset(origin.getStartOffset()); + copy.setEndOffset(origin.getEndOffset()); + copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart()); + copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd()); + return copy; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 159e5cb..47eb9c3 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -95,12 +95,14 @@ public class JobBuilderSupport { public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) { final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, getFactDistinctColumnsPath(jobId)); CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); CubingExecutableUtil.setIndexPath(this.getSecondaryIndexPath(jobId), result.getParams()); + return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index d6435b7..4e1be57 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -18,16 +18,29 @@ package org.apache.kylin.engine.mr.steps; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +72,10 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { segment.setInputRecordsSize(sourceSizeBytes); try { + if (segment.isSourceOffsetsOn()) { + updateTimeRange(segment); + } + cubeManager.promoteNewlyBuiltSegments(cube, segment); return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } catch (IOException e) { @@ -67,4 +84,54 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { } } + private void updateTimeRange(CubeSegment segment) throws IOException { + final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); + final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + final Path outputFile = new Path(outputPath, partitionCol.getName()); + + String minValue = null, maxValue = null, currentValue = null; + FSDataInputStream inputStream = null; + BufferedReader bufferedReader = null; + try { + FileSystem fs = HadoopUtil.getFileSystem(outputPath); + inputStream = fs.open(outputFile); + bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + minValue = currentValue = bufferedReader.readLine(); + while (currentValue != null) { + maxValue = currentValue; + currentValue = bufferedReader.readLine(); + } + } catch (IOException e) { + throw e; + } finally { + IOUtils.closeQuietly(bufferedReader); + IOUtils.closeQuietly(inputStream); + } + + final DataType partitionColType = partitionCol.getType(); + FastDateFormat dateFormat; + if (partitionColType.isDate()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); + } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); + } else if (partitionColType.isStringFamily()) { + String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) { + partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; + } + dateFormat = DateFormat.getDateFormat(partitionDateFormat); + } else { + throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); + } + + try { + long startTime = dateFormat.parse(minValue).getTime(); + long endTime = dateFormat.parse(maxValue).getTime(); + segment.setDateRangeStart(startTime); + segment.setDateRangeEnd(endTime); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index f6c8801..180d8d9 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -55,6 +55,9 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourcePartition; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; @@ -404,7 +407,10 @@ public class BuildCubeWithEngine { } private String buildSegment(String cubeName, long startDate, long endDate) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, endDate); + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + ISource source = SourceFactory.tableSource(cubeInstance); + SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, endDate, 0, 0, null, null)); + CubeSegment segment = cubeManager.appendSegment(cubeInstance, partition.getStartDate(), partition.getEndDate()); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 2faa8d0..000ac16 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -18,6 +18,8 @@ package org.apache.kylin.provision; +import static java.lang.Thread.sleep; + import java.io.File; import java.io.IOException; import java.text.ParseException; @@ -32,7 +34,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Lists; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.requests.MetadataResponse; @@ -44,8 +45,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.metadata.streaming.StreamingConfig; -import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; @@ -55,6 +54,11 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.job.streaming.Kafka10DataLoader; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.metadata.streaming.StreamingManager; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -64,7 +68,7 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.lang.Thread.sleep; +import com.google.common.collect.Lists; /** * for streaming cubing case "test_streaming_table" @@ -253,7 +257,10 @@ public class BuildCubeWithStream { } protected ExecutableState buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { - CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, null, null); + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + ISource source = SourceFactory.tableSource(cubeInstance); + SourcePartition partition = source.parsePartitionBeforeBuild(cubeInstance, new SourcePartition(0, 0, startOffset, endOffset, null, null)); + CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), partition); DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); jobService.addJob(job); waitForJob(job.getId()); http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index bc4d89c..49b9b9f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -51,6 +51,9 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.exception.BadRequestException; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.SourceFactory; +import org.apache.kylin.source.SourcePartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -206,11 +209,12 @@ public class JobService extends BasicService { } checkCubeDescSignature(cube); - DefaultChainedExecutable job; - if (buildType == CubeBuildTypeEnum.BUILD) { - CubeSegment newSeg = getCubeManager().appendSegment(cube, startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); + ISource source = SourceFactory.tableSource(cube); + SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); + sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition); + CubeSegment newSeg = getCubeManager().appendSegment(cube, sourcePartition); job = EngineFactory.createBatchCubingJob(newSeg, submitter); } else if (buildType == CubeBuildTypeEnum.MERGE) { CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force); @@ -364,15 +368,11 @@ public class JobService extends BasicService { @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')") public JobInstance cancelJob(JobInstance job) throws IOException, JobException { - // CubeInstance cube = this.getCubeManager().getCube(job.getRelatedCube()); - // for (BuildCubeJob cubeJob: listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING))) { - // getExecutableManager().stopJob(cubeJob.getId()); - // } CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube()); final String segmentIds = job.getRelatedSegment(); for (String segmentId : StringUtils.split(segmentIds)) { final CubeSegment segment = cubeInstance.getSegmentById(segmentId); - if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) { + if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) { // Remove this segments CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); cubeBuilder.setToRemoveSegs(segment); http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index e9cebea..af0a519 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -18,13 +18,18 @@ package org.apache.kylin.source.hive; -import com.google.common.collect.Lists; +import java.util.List; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; -import java.util.List; +import com.google.common.collect.Lists; +import org.apache.kylin.source.SourcePartition; //used by reflection public class HiveSource implements ISource { @@ -49,4 +54,27 @@ public class HiveSource implements ISource { return Lists.newArrayList(); } + @Override + public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { + SourcePartition result = SourcePartition.getCopyOf(srcPartition); + CubeInstance cube = (CubeInstance) buildable; + if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == true) { + // normal partitioned cube + if (result.getStartDate() == 0) { + final CubeSegment last = cube.getLastSegment(); + if (last != null) { + result.setStartDate(last.getDateRangeEnd()); + } + } + } else { + // full build + result.setStartDate(0); + result.setEndDate(Long.MAX_VALUE); + } + + result.setStartOffset(0); + result.setEndOffset(0); + return result; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index fb2a949..cdd7272 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -19,8 +19,12 @@ package org.apache.kylin.source.kafka; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; +import javax.annotation.Nullable; + +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -30,26 +34,33 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; import org.apache.kylin.source.kafka.job.MergeOffsetStep; -import org.apache.kylin.source.kafka.job.SeekOffsetStep; -import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep; +import com.google.common.base.Function; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaMRInput implements IMRInput { @@ -57,7 +68,7 @@ public class KafkaMRInput implements IMRInput { @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { - this.cubeSegment = (CubeSegment) flatDesc.getSegment(); + this.cubeSegment = (CubeSegment)flatDesc.getSegment(); return new BatchCubingInputSide(cubeSegment); } @@ -65,8 +76,14 @@ public class KafkaMRInput implements IMRInput { public IMRTableInputFormat getTableInputFormat(TableDesc table) { KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity()); - TableRef tableRef = cubeSegment.getCubeInstance().getDataModelDesc().findTable(table.getIdentity()); - List<TblColRef> columns = Lists.newArrayList(tableRef.getColumns()); + List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() { + @Nullable + @Override + public TblColRef apply(ColumnDesc input) { + return input.getRef(); + } + }); + return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null); } @@ -77,15 +94,11 @@ public class KafkaMRInput implements IMRInput { public static class KafkaTableInputFormat implements IMRTableInputFormat { private final CubeSegment cubeSegment; - private List<TblColRef> columns; private StreamingParser streamingParser; - private KafkaConfig kafkaConfig; private final JobEngineConfig conf; public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { this.cubeSegment = cubeSegment; - this.columns = columns; - this.kafkaConfig = kafkaConfig; this.conf = conf; try { streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); @@ -131,21 +144,9 @@ public class KafkaMRInput implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { - jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId())); jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId())); } - public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) { - final SeekOffsetStep result = new SeekOffsetStep(); - result.setName("Seek and update offset step"); - - CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); - CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); - CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); - - return result; - } - private MapReduceExecutable createSaveKafkaDataStep(String jobId) { MapReduceExecutable result = new MapReduceExecutable(); @@ -167,14 +168,10 @@ public class KafkaMRInput implements IMRInput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { - final UpdateTimeRangeStep result = new UpdateTimeRangeStep(); - result.setName("Update Segment Time Range"); - CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); - CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); - CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams()); - JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM"); - result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId())); - jobFlow.addTask(result); + GarbageCollectionStep step = new GarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_KAFKA_CLEANUP); + step.setDataPath(outputPath); + jobFlow.addTask(step); } @@ -211,4 +208,37 @@ public class KafkaMRInput implements IMRInput { } } + static class GarbageCollectionStep extends AbstractExecutable { + private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + StringBuffer output = new StringBuffer(); + try { + rmdirOnHDFS(getDataPath()); + } catch (IOException e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); + } + + return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + } + + private void rmdirOnHDFS(String path) throws IOException { + Path externalDataPath = new Path(path); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + } + } + + public void setDataPath(String externalDataPath) { + setParam("dataPath", externalDataPath); + } + + private String getDataPath() { + return getParam("dataPath"); + } + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 208c0ce..bb676e6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -18,15 +18,24 @@ package org.apache.kylin.source.kafka; -import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.metadata.streaming.StreamingConfig; +import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.streaming.StreamingConfig; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.util.KafkaClient; -import java.util.List; +import com.google.common.collect.Lists; //used by reflection public class KafkaSource implements ISource { @@ -54,4 +63,94 @@ public class KafkaSource implements ISource { return dependentResources; } + @Override + public SourcePartition parsePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { + checkSourceOffsets(srcPartition); + final SourcePartition result = SourcePartition.getCopyOf(srcPartition); + final CubeInstance cube = (CubeInstance) buildable; + if (result.getStartOffset() == 0) { + final CubeSegment last = cube.getLastSegment(); + if (last != null) { + // from last seg's end position + result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd()); + } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { + result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); + } else { + // from the topic's very begining; + result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); + } + } + + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cube.getConfig()).getKafkaConfig(cube.getFactTable()); + final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); + final String topic = kafakaConfig.getTopic(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) { + // has new partition added + for (int x = result.getSourcePartitionOffsetStart().size(); x < partitionInfos.size(); x++) { + long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); + result.getSourcePartitionOffsetStart().put(partitionInfos.get(x).partition(), earliest); + } + } + } + + if (result.getEndOffset() == Long.MAX_VALUE) { + result.setSourcePartitionOffsetEnd(KafkaClient.getCurrentOffsets(cube)); + } + + long totalStartOffset = 0, totalEndOffset = 0; + for (Long v : result.getSourcePartitionOffsetStart().values()) { + totalStartOffset += v; + } + for (Long v : result.getSourcePartitionOffsetEnd().values()) { + totalEndOffset += v; + } + + result.setStartOffset(totalStartOffset); + result.setEndOffset(totalEndOffset); + + return result; + } + + private void checkSourceOffsets(SourcePartition srcPartition) { + long startOffset = srcPartition.getStartOffset(); + long endOffset = srcPartition.getEndOffset(); + final Map<Integer, Long> sourcePartitionOffsetStart = srcPartition.getSourcePartitionOffsetStart(); + final Map<Integer, Long> sourcePartitionOffsetEnd = srcPartition.getSourcePartitionOffsetEnd(); + if (endOffset <= 0 || startOffset >= endOffset) { + throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'"); + } + + if (startOffset > 0) { + if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) { + throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset"); + } + + long totalOffset = 0; + for (Long v : sourcePartitionOffsetStart.values()) { + totalOffset += v; + } + + if (totalOffset != startOffset) { + throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'"); + } + } + + if (endOffset > 0 && endOffset != Long.MAX_VALUE) { + if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) { + throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset"); + } + + long totalOffset = 0; + for (Long v : sourcePartitionOffsetEnd.values()) { + totalOffset += v; + } + + if (totalOffset != endOffset) { + throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'"); + } + } + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java index c538acb..157d83c 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java @@ -61,6 +61,7 @@ public class KafkaConfig extends RootPersistentEntity { @JsonProperty("parserName") private String parserName; + @Deprecated @JsonProperty("margin") private long margin; @@ -120,10 +121,12 @@ public class KafkaConfig extends RootPersistentEntity { this.name = name; } + @Deprecated public long getMargin() { return margin; } + @Deprecated public void setMargin(long margin) { this.margin = margin; } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java index 81f6bac..fe0e2cc 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -23,9 +23,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -36,6 +33,10 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.source.kafka.util.KafkaClient; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; /** * Convert Kafka topic to Hadoop InputFormat @@ -45,16 +46,16 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); + final Configuration conf = context.getConfiguration(); - String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS); - String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC); - String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); - Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); - Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); + final String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS); + final String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC); + final String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); + final Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); + final Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); - Map<Integer, Long> startOffsetMap = Maps.newHashMap(); - Map<Integer, Long> endOffsetMap = Maps.newHashMap(); + final Map<Integer, Long> startOffsetMap = Maps.newHashMap(); + final Map<Integer, Long> endOffsetMap = Maps.newHashMap(); for (int i = partitionMin; i <= partitionMax; i++) { String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i); String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i); @@ -64,23 +65,19 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } } - List<InputSplit> splits = new ArrayList<InputSplit>(); + final List<InputSplit> splits = new ArrayList<InputSplit>(); try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) { - List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side"); for (int i = 0; i < partitionInfos.size(); i++) { - PartitionInfo partition = partitionInfos.get(i); + final PartitionInfo partition = partitionInfos.get(i); int partitionId = partition.partition(); if (startOffsetMap.containsKey(partitionId) == false) { throw new IllegalStateException("Partition '" + partitionId + "' not exists."); } - if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { - InputSplit split = new KafkaInputSplit( - brokers, inputTopic, - partitionId, - startOffsetMap.get(partitionId), endOffsetMap.get(partitionId) - ); + if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { + InputSplit split = new KafkaInputSplit(brokers, inputTopic, partitionId, startOffsetMap.get(partitionId), endOffsetMap.get(partitionId)); splits.add(split); } } @@ -89,9 +86,7 @@ public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { } @Override - public RecordReader<LongWritable, BytesWritable> createRecordReader( - InputSplit arg0, TaskAttemptContext arg1) throws IOException, - InterruptedException { + public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { return new KafkaInputRecordReader(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java index 98d6e4d..acaa751 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java @@ -17,28 +17,15 @@ */ package org.apache.kylin.source.kafka.job; -import org.apache.kylin.source.kafka.KafkaConfigManager; -import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.Map; - /** + * Deprecated, not in use. */ public class SeekOffsetStep extends AbstractExecutable { @@ -50,97 +37,8 @@ public class SeekOffsetStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - - Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart(); - Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd(); - - if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) { - return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided."); - } - - final Map<Integer, Long> cubeDescStart = cube.getDescriptor().getPartitionOffsetStart(); - if (cube.getSegments().size() == 1 && cubeDescStart != null && cubeDescStart.size() > 0) { - logger.info("This is the first segment, and has initiated the start offsets, will use it"); - startOffsets = cubeDescStart; - } - - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable()); - final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); - final String topic = kafakaConfig.getTopic(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { - final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); - - if (startOffsets.isEmpty()) { - // user didn't specify start offset, use the biggest offset in existing segments as start - for (CubeSegment seg : cube.getSegments()) { - Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd(); - for (PartitionInfo partition : partitionInfos) { - int partitionId = partition.partition(); - if (segEndOffset.containsKey(partitionId)) { - startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId))); - } - } - } - logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); - } - - if (partitionInfos.size() > startOffsets.size()) { - // has new partition added - for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { - long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); - startOffsets.put(partitionInfos.get(x).partition(), earliest); - } - } - - if (endOffsets.isEmpty()) { - // user didn't specify end offset, use latest offset in kafka - for (PartitionInfo partitionInfo : partitionInfos) { - long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); - endOffsets.put(partitionInfo.partition(), latest); - } - - logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString()); - } - } - - long totalStartOffset = 0, totalEndOffset = 0; - for (Long v : startOffsets.values()) { - totalStartOffset += v; - } - for (Long v : endOffsets.values()) { - totalEndOffset += v; - } - - if (totalEndOffset > totalStartOffset) { - segment.setSourceOffsetStart(totalStartOffset); - segment.setSourceOffsetEnd(totalEndOffset); - segment.setSourcePartitionOffsetStart(startOffsets); - segment.setSourcePartitionOffsetEnd(endOffsets); - segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset)); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset)); - } else { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - - return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes"); - } - + return new ExecuteResult(ExecuteResult.State.SUCCEED, "No in use"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java index d19aa63..8c31c70 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java @@ -17,34 +17,15 @@ */ package org.apache.kylin.source.kafka.job; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * Deprecated, not in use. */ public class UpdateTimeRangeStep extends AbstractExecutable { @@ -56,62 +37,7 @@ public class UpdateTimeRangeStep extends AbstractExecutable { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - final Path outputFile = new Path(outputPath, partitionCol.getName()); - - String minValue = null, maxValue = null, currentValue = null; - FSDataInputStream inputStream = null; - BufferedReader bufferedReader = null; - try { - FileSystem fs = HadoopUtil.getFileSystem(outputPath); - inputStream = fs.open(outputFile); - bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - minValue = currentValue = bufferedReader.readLine(); - while (currentValue != null) { - maxValue = currentValue; - currentValue = bufferedReader.readLine(); - } - } catch (IOException e) { - logger.error("fail to read file " + outputFile, e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } finally { - IOUtils.closeQuietly(bufferedReader); - IOUtils.closeQuietly(inputStream); - } - - final DataType partitionColType = partitionCol.getType(); - FastDateFormat dateFormat; - if (partitionColType.isDate()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) { - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - } - dateFormat = DateFormat.getDateFormat(partitionDateFormat); - } else { - return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type"); - } - - try { - long startTime = dateFormat.parse(minValue).getTime(); - long endTime = dateFormat.parse(maxValue).getTime(); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - segment.setDateRangeStart(startTime); - segment.setDateRangeEnd(endTime); - cubeBuilder.setToUpdateSegs(segment); - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (Exception e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aab17b64/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index 685af6a..a0bbd22 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -59,7 +59,7 @@ public class KafkaClient { props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 50); - props.put("timeout.ms", "30000"); + props.put("request.timeout.ms", "30000"); if (properties != null) { for (Map.Entry entry : properties.entrySet()) { props.put(entry.getKey(), entry.getValue()); @@ -75,12 +75,12 @@ public class KafkaClient { props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", consumerGroup); props.put("session.timeout.ms", "30000"); - props.put("enable.auto.commit", "false"); if (properties != null) { for (Map.Entry entry : properties.entrySet()) { props.put(entry.getKey(), entry.getValue()); } } + props.put("enable.auto.commit", "false"); return props; } @@ -126,7 +126,25 @@ public class KafkaClient { try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionInfos) { - long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); + long latest = getLatestOffset(consumer, topic, partitionInfo.partition()); + startOffsets.put(partitionInfo.partition(), latest); + } + } + return startOffsets; + } + + + public static Map<Integer, Long> getEarliestOffsets(final CubeInstance cubeInstance) { + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable()); + + final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); + final String topic = kafakaConfig.getTopic(); + + Map<Integer, Long> startOffsets = Maps.newHashMap(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cubeInstance.getName(), null)) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + for (PartitionInfo partitionInfo : partitionInfos) { + long latest = getEarliestOffset(consumer, topic, partitionInfo.partition()); startOffsets.put(partitionInfo.partition(), latest); } }