Repository: kylin Updated Branches: refs/heads/master-cdh5.7 d5efd44c0 -> 5f8b5e816 (forced update)
KYLIN-1726 package rename Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c67fa740 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c67fa740 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c67fa740 Branch: refs/heads/master-cdh5.7 Commit: c67fa740d364d372c7a6424fd160570cc7e890c4 Parents: 5aee022 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Oct 9 13:24:59 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Oct 10 13:32:44 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/kafka/KafkaMRInput.java | 3 + .../kylin/source/kafka/MergeOffsetStep.java | 80 ----------- .../kylin/source/kafka/SeekOffsetStep.java | 140 ------------------ .../source/kafka/StringStreamingParser.java | 51 ------- .../apache/kylin/source/kafka/TopicMeta.java | 46 ------ .../kylin/source/kafka/UpdateTimeRangeStep.java | 117 --------------- .../kylin/source/kafka/job/MergeOffsetStep.java | 80 +++++++++++ .../kylin/source/kafka/job/SeekOffsetStep.java | 141 +++++++++++++++++++ .../source/kafka/job/UpdateTimeRangeStep.java | 117 +++++++++++++++ .../kafka/util/ByteBufferBackedInputStream.java | 6 +- 10 files changed, 344 insertions(+), 437 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 6358ee1..4d1f5c9 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -43,6 +43,9 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.apache.kylin.source.kafka.job.MergeOffsetStep; +import org.apache.kylin.source.kafka.job.SeekOffsetStep; +import org.apache.kylin.source.kafka.job.UpdateTimeRangeStep; import javax.annotation.Nullable; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java deleted file mode 100644 index 18c959a..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class MergeOffsetStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); - public MergeOffsetStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - - List<CubeSegment> mergingSegs = cube.getMergingSegments(segment); - - Collections.sort(mergingSegs); - - final CubeSegment first = mergingSegs.get(0); - final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1); - - segment.setSourceOffsetStart(first.getSourceOffsetStart()); - segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); - segment.setSourceOffsetEnd(last.getSourceOffsetEnd()); - segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); - - long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs); - long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs); - - segment.setDateRangeStart(dateRangeStart); - segment.setDateRangeEnd(dateRangeEnd); - - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java deleted file mode 100644 index 151b912..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - */ -public class SeekOffsetStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class); - - public SeekOffsetStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - - Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart(); - Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd(); - - if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) { - return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided."); - } - - final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable()); - final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); - final String topic = kafakaConfig.getTopic(); - try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { - final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); - - if (startOffsets.isEmpty()) { - // user didn't specify start offset, use the biggest offset in existing segments as start - for (CubeSegment seg : cube.getSegments()) { - Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd(); - for (PartitionInfo partition : partitionInfos) { - int partitionId = partition.partition(); - if (segEndOffset.containsKey(partitionId)) { - startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId))); - } - } - } - - if (partitionInfos.size() > startOffsets.size()) { - // has new partition added - for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { - long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); - startOffsets.put(partitionInfos.get(x).partition(), earliest); - } - } - - logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); - } - - if (endOffsets.isEmpty()) { - // user didn't specify end offset, use latest offset in kafka - for (PartitionInfo partitionInfo : partitionInfos) { - long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); - endOffsets.put(partitionInfo.partition(), latest); - } - - logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString()); - } - } - - long totalStartOffset = 0, totalEndOffset = 0; - for (Long v : startOffsets.values()) { - totalStartOffset += v; - } - for (Long v : endOffsets.values()) { - totalEndOffset += v; - } - - if (totalEndOffset > totalStartOffset) { - segment.setSourceOffsetStart(totalStartOffset); - segment.setSourceOffsetEnd(totalEndOffset); - segment.setSourcePartitionOffsetStart(startOffsets); - segment.setSourcePartitionOffsetEnd(endOffsets); - segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset)); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset)); - } else { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - - return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes"); - } - - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java deleted file mode 100644 index f74df83..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.kafka; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; - -/** - */ -public final class StringStreamingParser extends StreamingParser { - - public static final StringStreamingParser instance = new StringStreamingParser(null, null); - - private StringStreamingParser(List<TblColRef> allColumns, Map<String, String> properties) { - } - - @Override - public StreamingMessage parse(ByteBuffer message) { - byte[] bytes = new byte[message.limit()]; - message.get(bytes); - return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap()); - } - - @Override - public boolean filter(StreamingMessage streamingMessage) { - return true; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java deleted file mode 100644 index a73543e..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.kafka; - -import java.util.Collections; -import java.util.List; - -/** - * The topic metadata should be invariant, otherwise will cause re-initialization of the Consumer - * - */ -public class TopicMeta { - - private final String name; - - private final List<Integer> partitionIds; - - public TopicMeta(String name, List<Integer> partitionIds) { - this.name = name; - this.partitionIds = Collections.unmodifiableList(partitionIds); - } - - public String getName() { - return name; - } - - public List<Integer> getPartitionIds() { - return partitionIds; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java deleted file mode 100644 index 9e902d8..0000000 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.kafka; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class UpdateTimeRangeStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class); - - public UpdateTimeRangeStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); - final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); - final Path outputFile = new Path(outputPath, partitionCol.getName()); - - String minValue = null, maxValue = null, currentValue = null; - FSDataInputStream inputStream = null; - BufferedReader bufferedReader = null; - try { - FileSystem fs = HadoopUtil.getFileSystem(outputPath); - inputStream = fs.open(outputFile); - bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - minValue = currentValue = bufferedReader.readLine(); - while (currentValue != null) { - maxValue = currentValue; - currentValue = bufferedReader.readLine(); - } - } catch (IOException e) { - logger.error("fail to read file " + outputFile, e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } finally { - IOUtils.closeQuietly(bufferedReader); - IOUtils.closeQuietly(inputStream); - } - - final DataType partitionColType = partitionCol.getType(); - FastDateFormat dateFormat; - if (partitionColType.isDate()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); - } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { - dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - } else if (partitionColType.isStringFamily()) { - String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); - if (StringUtils.isEmpty(partitionDateFormat)) { - partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; - } - dateFormat = DateFormat.getDateFormat(partitionDateFormat); - } else { - return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type"); - } - - try { - long startTime = dateFormat.parse(minValue).getTime(); - long endTime = dateFormat.parse(maxValue).getTime(); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - segment.setDateRangeStart(startTime); - segment.setDateRangeEnd(endTime); - cubeBuilder.setToUpdateSegs(segment); - cubeManager.updateCube(cubeBuilder); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (Exception e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java new file mode 100644 index 0000000..9cadd72 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.kafka.job; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class MergeOffsetStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); + public MergeOffsetStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + List<CubeSegment> mergingSegs = cube.getMergingSegments(segment); + + Collections.sort(mergingSegs); + + final CubeSegment first = mergingSegs.get(0); + final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1); + + segment.setSourceOffsetStart(first.getSourceOffsetStart()); + segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); + segment.setSourceOffsetEnd(last.getSourceOffsetEnd()); + segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd()); + + long dateRangeStart = CubeManager.minDateRangeStart(mergingSegs); + long dateRangeEnd = CubeManager.maxDateRangeEnd(mergingSegs); + + segment.setDateRangeStart(dateRangeStart); + segment.setDateRangeEnd(dateRangeEnd); + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java new file mode 100644 index 0000000..5751095 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/SeekOffsetStep.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.kafka.job; + +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.util.KafkaClient; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + */ +public class SeekOffsetStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class); + + public SeekOffsetStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + Map<Integer, Long> startOffsets = segment.getSourcePartitionOffsetStart(); + Map<Integer, Long> endOffsets = segment.getSourcePartitionOffsetEnd(); + + if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided."); + } + + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable()); + final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); + final String topic = kafakaConfig.getTopic(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + + if (startOffsets.isEmpty()) { + // user didn't specify start offset, use the biggest offset in existing segments as start + for (CubeSegment seg : cube.getSegments()) { + Map<Integer, Long> segEndOffset = seg.getSourcePartitionOffsetEnd(); + for (PartitionInfo partition : partitionInfos) { + int partitionId = partition.partition(); + if (segEndOffset.containsKey(partitionId)) { + startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId))); + } + } + } + + if (partitionInfos.size() > startOffsets.size()) { + // has new partition added + for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { + long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); + startOffsets.put(partitionInfos.get(x).partition(), earliest); + } + } + + logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); + } + + if (endOffsets.isEmpty()) { + // user didn't specify end offset, use latest offset in kafka + for (PartitionInfo partitionInfo : partitionInfos) { + long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); + endOffsets.put(partitionInfo.partition(), latest); + } + + logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString()); + } + } + + long totalStartOffset = 0, totalEndOffset = 0; + for (Long v : startOffsets.values()) { + totalStartOffset += v; + } + for (Long v : endOffsets.values()) { + totalEndOffset += v; + } + + if (totalEndOffset > totalStartOffset) { + segment.setSourceOffsetStart(totalStartOffset); + segment.setSourceOffsetEnd(totalEndOffset); + segment.setSourcePartitionOffsetStart(startOffsets); + segment.setSourcePartitionOffsetEnd(endOffsets); + segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset)); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + } catch (IOException e) { + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset)); + } else { + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + } catch (IOException e) { + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + + return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes"); + } + + + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java new file mode 100644 index 0000000..d19aa63 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/UpdateTimeRangeStep.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.kafka.job; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class UpdateTimeRangeStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class); + + public UpdateTimeRangeStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); + final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + final Path outputFile = new Path(outputPath, partitionCol.getName()); + + String minValue = null, maxValue = null, currentValue = null; + FSDataInputStream inputStream = null; + BufferedReader bufferedReader = null; + try { + FileSystem fs = HadoopUtil.getFileSystem(outputPath); + inputStream = fs.open(outputFile); + bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + minValue = currentValue = bufferedReader.readLine(); + while (currentValue != null) { + maxValue = currentValue; + currentValue = bufferedReader.readLine(); + } + } catch (IOException e) { + logger.error("fail to read file " + outputFile, e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } finally { + IOUtils.closeQuietly(bufferedReader); + IOUtils.closeQuietly(inputStream); + } + + final DataType partitionColType = partitionCol.getType(); + FastDateFormat dateFormat; + if (partitionColType.isDate()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); + } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); + } else if (partitionColType.isStringFamily()) { + String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) { + partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; + } + dateFormat = DateFormat.getDateFormat(partitionDateFormat); + } else { + return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type"); + } + + try { + long startTime = dateFormat.parse(minValue).getTime(); + long endTime = dateFormat.parse(maxValue).getTime(); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + segment.setDateRangeStart(startTime); + segment.setDateRangeEnd(endTime); + cubeBuilder.setToUpdateSegs(segment); + cubeManager.updateCube(cubeBuilder); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (Exception e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c67fa740/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java index 7a42598..894a144 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/ByteBufferBackedInputStream.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.source.kafka.util; import java.io.IOException;