Revert "Revert "KYLIN-1762 discard job when no stream message""
This reverts commit da5ba276b972f8b3c0d220252e74ac2ff73298fc. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/25f8ffc0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/25f8ffc0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/25f8ffc0 Branch: refs/heads/KYLIN-1726-2 Commit: 25f8ffc0ee16b1702723d5530b22084a608fed9a Parents: ae3d7e4 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Sep 24 14:57:01 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 27 10:17:40 2016 +0800 ---------------------------------------------------------------------- .../job/execution/DefaultChainedExecutable.java | 6 +++ .../kylin/source/kafka/SeekOffsetStep.java | 45 +++++++++++++++----- 2 files changed, 41 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/25f8ffc0/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 753b389..39a5f4f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -88,6 +88,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai boolean allSucceed = true; boolean hasError = false; boolean hasRunning = false; + boolean hasDiscarded = false; for (Executable task : jobs) { final ExecutableState status = task.getStatus(); if (status == ExecutableState.ERROR) { @@ -99,6 +100,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai if (status == ExecutableState.RUNNING) { hasRunning = true; } + if (status == ExecutableState.DISCARDED) { + hasDiscarded = true; + } } if (allSucceed) { setEndTime(System.currentTimeMillis()); @@ -110,6 +114,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai notifyUserStatusChange(executableContext, ExecutableState.ERROR); } else if (hasRunning) { jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + } else if (hasDiscarded) { + jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); } else { jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); } http://git-wip-us.apache.org/repos/asf/kylin/blob/25f8ffc0/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java index 5dca93f..479f1b8 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java @@ -17,6 +17,10 @@ */ package org.apache.kylin.source.kafka; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Maps; +import org.apache.commons.math3.util.MathUtils; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,6 +38,7 @@ import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -101,19 +106,39 @@ public class SeekOffsetStep extends AbstractExecutable { } } - KafkaOffsetMapping.saveOffsetStart(segment, startOffsets); - KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets); + long totalStartOffset = 0, totalEndOffset = 0; + for (Long v : startOffsets.values()) { + totalStartOffset += v; + } + for (Long v : endOffsets.values()) { + totalEndOffset += v; + } - segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd())); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); + if (totalEndOffset > totalStartOffset) { + KafkaOffsetMapping.saveOffsetStart(segment, startOffsets); + KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets); + segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset)); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + } catch (IOException e) { + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to update cube segment offset", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } else { + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + } catch (IOException e) { + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + + return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes"); } + + } }