Revert "Revert "KYLIN-1726 Scalable streaming cubing"" This reverts commit 506cd783132023a06f1669ad248b74bf9d96d0e1.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8431af45 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8431af45 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8431af45 Branch: refs/heads/KYLIN-1726-2 Commit: 8431af45528abb2d39a69b5e762712983573e5a6 Parents: a00d1e3 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Sep 24 14:55:59 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Tue Sep 27 10:17:40 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 35 ++- .../kylin/job/streaming/Kafka10DataLoader.java | 80 +++++++ .../apache/kylin/common/KylinConfigBase.java | 1 + .../java/org/apache/kylin/cube/CubeSegment.java | 1 + .../java/org/apache/kylin/cube/ISegment.java | 39 ---- .../cube/gridtable/SegmentGTStartAndEnd.java | 2 +- .../cube/model/CubeJoinedFlatTableDesc.java | 6 + .../cube/model/CubeJoinedFlatTableEnrich.java | 6 + .../apache/kylin/gridtable/ScannerWorker.java | 2 +- .../metadata/model/IJoinedFlatTableDesc.java | 2 + .../apache/kylin/metadata/model/ISegment.java | 36 +++ .../kylin/engine/mr/BatchMergeJobBuilder2.java | 3 + .../org/apache/kylin/engine/mr/IMRInput.java | 10 + .../java/org/apache/kylin/engine/mr/MRUtil.java | 4 + .../test_streaming_table_model_desc.json | 6 +- .../kylin/provision/BuildCubeWithStream.java | 218 +++++++++++++----- .../org/apache/kylin/provision/MockKafka.java | 191 ++++++++++++++++ .../apache/kylin/provision/NetworkUtils.java | 52 +++++ pom.xml | 2 +- .../apache/kylin/source/hive/HiveMRInput.java | 11 + source-kafka/pom.xml | 13 +- .../apache/kylin/source/kafka/KafkaMRInput.java | 221 +++++++++++++++++++ .../apache/kylin/source/kafka/KafkaSource.java | 57 +++++ .../kylin/source/kafka/KafkaStreamingInput.java | 17 +- .../kylin/source/kafka/MergeOffsetStep.java | 89 ++++++++ .../kylin/source/kafka/SeekOffsetStep.java | 119 ++++++++++ .../kylin/source/kafka/UpdateTimeRangeStep.java | 108 +++++++++ .../source/kafka/config/KafkaClusterConfig.java | 3 +- .../source/kafka/hadoop/KafkaFlatTableJob.java | 165 ++++++++++++++ .../kafka/hadoop/KafkaFlatTableMapper.java | 51 +++++ .../source/kafka/hadoop/KafkaInputFormat.java | 98 ++++++++ .../kafka/hadoop/KafkaInputRecordReader.java | 166 ++++++++++++++ .../source/kafka/hadoop/KafkaInputSplit.java | 102 +++++++++ .../kylin/source/kafka/util/KafkaClient.java | 115 ++++++++++ .../source/kafka/util/KafkaOffsetMapping.java | 97 ++++++++ .../kylin/source/kafka/util/KafkaRequester.java | 7 +- .../kylin/source/kafka/util/KafkaUtils.java | 3 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 2 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 2 +- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 2 +- 40 files changed, 2024 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 8c64f91..9b282e3 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -143,14 +143,12 @@ public class DeployUtil { deployHiveTables(); } - public static void prepareTestDataForStreamingCube(long startTime, long endTime, String cubeName, StreamDataLoader streamDataLoader) throws IOException { + public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); - List<String> data = StreamingTableDataGenerator.generate(10000, startTime, endTime, cubeInstance.getFactTable()); - List<String> data2 = StreamingTableDataGenerator.generate(10, endTime, endTime + 300000, cubeInstance.getFactTable()); + List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getFactTable()); TableDesc tableDesc = cubeInstance.getFactTableDesc(); //load into kafka streamDataLoader.loadIntoKafka(data); - streamDataLoader.loadIntoKafka(data2); logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString()); //csv data for H2 use @@ -165,7 +163,7 @@ public class DeployUtil { sb.append(StringUtils.join(rowColumns, ",")); sb.append(System.getProperty("line.separator")); } - overrideFactTableData(sb.toString(), cubeInstance.getFactTable()); + appendFactTableData(sb.toString(), cubeInstance.getFactTable()); } public static void overrideFactTableData(String factTableContent, String factTableName) throws IOException { @@ -179,6 +177,33 @@ public class DeployUtil { in.close(); } + public static void appendFactTableData(String factTableContent, String factTableName) throws IOException { + // Write to resource store + ResourceStore store = ResourceStore.getStore(config()); + + InputStream in = new ByteArrayInputStream(factTableContent.getBytes("UTF-8")); + String factTablePath = "/data/" + factTableName + ".csv"; + + File tmpFile = File.createTempFile(factTableName, "csv"); + FileOutputStream out = new FileOutputStream(tmpFile); + + try { + if (store.exists(factTablePath)) { + InputStream oldContent = store.getResource(factTablePath).inputStream; + IOUtils.copy(oldContent, out); + } + IOUtils.copy(in, out); + IOUtils.closeQuietly(in); + + store.deleteResource(factTablePath); + in = new FileInputStream(tmpFile); + store.putResource(factTablePath, in, System.currentTimeMillis()); + } finally { + IOUtils.closeQuietly(out); + IOUtils.closeQuietly(in); + } + + } private static void deployHiveTables() throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java new file mode 100644 index 0000000..a5132af --- /dev/null +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.streaming; + +import java.util.List; +import java.util.Properties; + +import javax.annotation.Nullable; + +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kylin.source.kafka.config.BrokerConfig; +import org.apache.kylin.source.kafka.config.KafkaClusterConfig; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; + +import org.apache.kylin.source.kafka.util.KafkaClient; + +/** + * Load prepared data into kafka(for test use) + */ +public class Kafka10DataLoader extends StreamDataLoader { + private static final Logger logger = LoggerFactory.getLogger(Kafka10DataLoader.class); + List<KafkaClusterConfig> kafkaClusterConfigs; + + public Kafka10DataLoader(KafkaConfig kafkaConfig) { + super(kafkaConfig); + this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs(); + } + + public void loadIntoKafka(List<String> messages) { + + KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0); + String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() { + @Nullable + @Override + public String apply(BrokerConfig brokerConfig) { + return brokerConfig.getHost() + ":" + brokerConfig.getPort(); + } + }), ","); + + Properties props = new Properties(); + props.put("acks", "1"); + props.put("retry.backoff.ms", "1000"); + KafkaProducer producer = KafkaClient.getKafkaProducer(brokerList, props); + + int boundary = messages.size() / 10; + for (int i = 0; i < messages.size(); ++i) { + ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i)); + producer.send(keyedMessage); + if (i % boundary == 0) { + logger.info("sending " + i + " messages to " + this.toString()); + } + } + logger.info("sent " + messages.size() + " messages to " + this.toString()); + producer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ceb188e..914f726 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -713,6 +713,7 @@ abstract public class KylinConfigBase implements Serializable { Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine.")); // ref constants in ISourceAware r.put(0, "org.apache.kylin.source.hive.HiveSource"); + r.put(1, "org.apache.kylin.source.kafka.KafkaSource"); return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 79397c3..afb0d28 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -37,6 +37,7 @@ import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealization; http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java b/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java deleted file mode 100644 index 2e1f214..0000000 --- a/core-cube/src/main/java/org/apache/kylin/cube/ISegment.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.cube; - -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -public interface ISegment { - - public String getName(); - - public long getDateRangeStart(); - - public long getDateRangeEnd(); - - public long getSourceOffsetStart(); - - public long getSourceOffsetEnd(); - - public DataModelDesc getModel(); - - public SegmentStatusEnum getStatus(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java index 21e01b9..b4a82d4 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/SegmentGTStartAndEnd.java @@ -24,7 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.ISegment; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.dimension.AbstractDateDimEnc; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.metadata.datatype.DataType; http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 6aeb617..6ca89c8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -26,6 +26,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -162,4 +163,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { return cubeDesc.getDistributedByColumn(); } + @Override + public ISegment getSegment() { + return cubeSegment; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index 5212859..8af2297 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -25,6 +25,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -137,4 +138,9 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { return flatDesc.getDistributedBy(); } + @Override + public ISegment getSegment() { + return flatDesc.getSegment(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java index a0472e5..f26d993 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Iterator; -import org.apache.kylin.cube.ISegment; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index f3a4107..ffa2680 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -37,4 +37,6 @@ public interface IJoinedFlatTableDesc { long getSourceOffsetEnd(); TblColRef getDistributedBy(); + + ISegment getSegment(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java new file mode 100644 index 0000000..f69ae3f --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.model; + +public interface ISegment { + + public String getName(); + + public long getDateRangeStart(); + + public long getDateRangeEnd(); + + public long getSourceOffsetStart(); + + public long getSourceOffsetEnd(); + + public DataModelDesc getModel(); + + public SegmentStatusEnum getStatus(); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index 129d525..badf628 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -34,10 +34,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { private static final Logger logger = LoggerFactory.getLogger(BatchMergeJobBuilder2.class); private final IMROutput2.IMRBatchMergeOutputSide2 outputSide; + private final IMRInput.IMRBatchMergeInputSide inputSide; public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) { super(mergeSegment, submitter); this.outputSide = MRUtil.getBatchMergeOutputSide2(seg); + this.inputSide = MRUtil.getBatchMergeInputSide(seg); } public CubingJob build() { @@ -55,6 +57,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { } // Phase 1: Merge Dictionary + inputSide.addStepPhase1_MergeDictionary(result); result.addTask(createMergeDictionaryStep(mergingSegmentIds)); result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId))); outputSide.addStepPhase1_MergeDictionary(result); http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java index 582052f..62cede9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java @@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr; import org.apache.hadoop.mapreduce.Job; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.TableDesc; /** @@ -34,6 +35,9 @@ public interface IMRInput { /** Return an InputFormat that reads from specified table. */ public IMRTableInputFormat getTableInputFormat(TableDesc table); + /** Return a helper to participate in batch cubing merge job flow. */ + public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg); + /** * Utility that configures mapper to read from a table. */ @@ -67,4 +71,10 @@ public interface IMRInput { public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); } + public interface IMRBatchMergeInputSide { + + /** Add step that executes before merge dictionary and before merge cube. */ + public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow); + + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 2c3b77f..67eef5e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -71,6 +71,10 @@ public class MRUtil { return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg); } + public static IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment seg) { + return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg); + } + // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe public static int runMRJob(Tool tool, String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json index cfb889a..e6977e1 100644 --- a/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json +++ b/examples/test_case_data/localmeta/model_desc/test_streaming_table_model_desc.json @@ -4,7 +4,7 @@ "name": "test_streaming_table_model_desc", "dimensions": [ { - "table": "default.streaming_table", + "table": "DEFAULT.STREAMING_TABLE", "columns": [ "minute_start", "hour_start", @@ -20,10 +20,10 @@ "item_count" ], "last_modified": 0, - "fact_table": "default.streaming_table", + "fact_table": "DEFAULT.STREAMING_TABLE", "filter_condition": null, "partition_desc": { - "partition_date_column": "default.streaming_table.minute_start", + "partition_date_column": "DEFAULT.STREAMING_TABLE.minute_start", "partition_date_start": 0, "partition_type": "APPEND" } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 9490560..7f79acc 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,24 +20,36 @@ package org.apache.kylin.provision; import java.io.File; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.TimeZone; import java.util.UUID; +import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.engine.streaming.OneOffStreamingBuilder; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; import org.apache.kylin.job.DeployUtil; -import org.apache.kylin.job.streaming.KafkaDataLoader; -import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.impl.threadpool.DefaultScheduler; +import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.job.streaming.Kafka10DataLoader; import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; -import org.apache.kylin.storage.hbase.util.StorageCleanupJob; +import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,31 +58,123 @@ import org.slf4j.LoggerFactory; */ public class BuildCubeWithStream { - private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithStream.class); - private static final String cubeName = "test_streaming_table_cube"; - private static final long startTime = DateFormat.stringToMillis("2015-01-01 00:00:00"); - private static final long endTime = DateFormat.stringToMillis("2015-01-03 00:00:00"); - private static final long batchInterval = 16 * 60 * 60 * 1000;//16 hours + private static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.provision.BuildCubeWithStream.class); - private KylinConfig kylinConfig; + private CubeManager cubeManager; + private DefaultScheduler scheduler; + protected ExecutableManager jobService; + private static final String cubeName = "test_streaming_table_cube"; - public static void main(String[] args) throws Exception { + private KafkaConfig kafkaConfig; + private MockKafka kafkaServer; - try { - beforeClass(); + public void before() throws Exception { + deployEnv(); - BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); - buildCubeWithStream.before(); - buildCubeWithStream.build(); - logger.info("Build is done"); - buildCubeWithStream.cleanup(); - logger.info("Going to exit"); - System.exit(0); - } catch (Exception e) { - logger.error("error", e); - System.exit(1); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + jobService = ExecutableManager.getInstance(kylinConfig); + scheduler = DefaultScheduler.createInstance(); + scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); } + cubeManager = CubeManager.getInstance(kylinConfig); + + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final String factTable = cubeInstance.getFactTable(); + + final StreamingManager streamingManager = StreamingManager.getInstance(kylinConfig); + final StreamingConfig streamingConfig = streamingManager.getStreamingConfig(factTable); + kafkaConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingConfig.getName()); + + String topicName = UUID.randomUUID().toString(); + String localIp = NetworkUtils.getLocalIp(); + BrokerConfig brokerConfig = kafkaConfig.getKafkaClusterConfigs().get(0).getBrokerConfigs().get(0); + brokerConfig.setHost(localIp); + kafkaConfig.setTopic(topicName); + KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(kafkaConfig); + + startEmbeddedKafka(topicName, brokerConfig); + } + + private void startEmbeddedKafka(String topicName, BrokerConfig brokerConfig) { + //Start mock Kakfa + String zkConnectionStr = "sandbox:2181"; + ZkConnection zkConnection = new ZkConnection(zkConnectionStr); + // Assert.assertEquals(ZooKeeper.States.CONNECTED, zkConnection.getZookeeperState()); + kafkaServer = new MockKafka(zkConnection, brokerConfig.getPort(), brokerConfig.getId()); + kafkaServer.start(); + + kafkaServer.createTopic(topicName, 3, 1); + kafkaServer.waitTopicUntilReady(topicName); + + MetadataResponse.TopicMetadata topicMetadata = kafkaServer.fetchTopicMeta(topicName); + Assert.assertEquals(topicName, topicMetadata.topic()); + } + + private void generateStreamData(long startTime, long endTime, int numberOfRecords) throws IOException { + Kafka10DataLoader dataLoader = new Kafka10DataLoader(kafkaConfig); + DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, numberOfRecords, cubeName, dataLoader); + logger.info("Test data inserted into Kafka"); + } + + private void clearSegment(String cubeName) throws Exception { + CubeInstance cube = cubeManager.getCube(cubeName); + // remove all existing segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + cubeManager.updateCube(cubeBuilder); + } + + public void build() throws Exception { + clearSegment(cubeName); + SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); + f.setTimeZone(TimeZone.getTimeZone("GMT")); + long date1 = 0; + long date2 = f.parse("2013-01-01").getTime(); + + int numberOfRecrods1 = 10000; + generateStreamData(date1, date2, numberOfRecrods1); + buildSegment(cubeName, 0, Long.MAX_VALUE); + + long date3 = f.parse("2013-04-01").getTime(); + int numberOfRecrods2 = 5000; + generateStreamData(date2, date3, numberOfRecrods2); + buildSegment(cubeName, 0, Long.MAX_VALUE); + + //merge + mergeSegment(cubeName, 0, 15000); + + } + + private String mergeSegment(String cubeName, long startOffset, long endOffset) throws Exception { + CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset, true); + DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + private String refreshSegment(String cubeName, long startOffset, long endOffset) throws Exception { + CubeSegment segment = cubeManager.refreshSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + private String buildSegment(String cubeName, long startOffset, long endOffset) throws Exception { + CubeSegment segment = cubeManager.appendSegment(cubeManager.getCube(cubeName), 0, 0, startOffset, endOffset); + DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); + jobService.addJob(job); + waitForJob(job.getId()); + return job.getId(); + } + + protected void deployEnv() throws IOException { + DeployUtil.overrideJobJarLocations(); + //DeployUtil.initCliWorkDir(); + //DeployUtil.deployMetadata(); } public static void beforeClass() throws Exception { @@ -83,44 +187,54 @@ public class BuildCubeWithStream { HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); } - protected void deployEnv() throws IOException { - DeployUtil.overrideJobJarLocations(); + public static void afterClass() throws Exception { + HBaseMetadataTestCase.staticCleanupTestMetadata(); } - public void before() throws Exception { - deployEnv(); + public void after() { + kafkaServer.stop(); + } - kylinConfig = KylinConfig.getInstanceFromEnv(); - final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); - final String factTable = cubeInstance.getFactTable(); - final StreamingConfig config = StreamingManager.getInstance(kylinConfig).getStreamingConfig(factTable); + protected void waitForJob(String jobId) { + while (true) { + AbstractExecutable job = jobService.getJob(jobId); + if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { + break; + } else { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } - //Use a random topic for kafka data stream - KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(config.getName()); - streamingConfig.setTopic(UUID.randomUUID().toString()); - KafkaConfigManager.getInstance(kylinConfig).saveKafkaConfig(streamingConfig); + public static void main(String[] args) throws Exception { + try { + beforeClass(); - DeployUtil.prepareTestDataForStreamingCube(startTime, endTime, cubeName, new KafkaDataLoader(streamingConfig)); - } + BuildCubeWithStream buildCubeWithStream = new BuildCubeWithStream(); + buildCubeWithStream.before(); + buildCubeWithStream.build(); + logger.info("Build is done"); + buildCubeWithStream.after(); + afterClass(); + logger.info("Going to exit"); + System.exit(0); + } catch (Exception e) { + logger.error("error", e); + System.exit(1); + } - public void cleanup() throws Exception { - cleanupOldStorage(); - HBaseMetadataTestCase.staticCleanupTestMetadata(); } protected int cleanupOldStorage() throws Exception { String[] args = { "--delete", "true" }; - StorageCleanupJob cli = new StorageCleanupJob(); - cli.execute(args); + // KapStorageCleanupCLI cli = new KapStorageCleanupCLI(); + // cli.execute(args); return 0; } - public void build() throws Exception { - logger.info("start time:" + startTime + " end time:" + endTime + " batch interval:" + batchInterval + " batch count:" + ((endTime - startTime) / batchInterval)); - for (long start = startTime; start < endTime; start += batchInterval) { - logger.info(String.format("build batch:{%d, %d}", start, start + batchInterval)); - new OneOffStreamingBuilder(RealizationType.CUBE, cubeName, start, start + batchInterval).build().run(); - } - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java new file mode 100644 index 0000000..3f47923 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/MockKafka.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.provision; + +import java.io.UnsupportedEncodingException; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.kafka.common.requests.MetadataResponse; + +import kafka.admin.AdminUtils; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZkUtils; + +public class MockKafka { + private static Properties createProperties(ZkConnection zkServerConnection, String logDir, String port, String brokerId) { + Properties properties = new Properties(); + properties.put("port", port); + properties.put("broker.id", brokerId); + properties.put("log.dirs", logDir); + properties.put("host.name", "localhost"); + properties.put("offsets.topic.replication.factor", "1"); + properties.put("delete.topic.enable", "true"); + properties.put("zookeeper.connect", zkServerConnection.getServers()); + String ip = NetworkUtils.getLocalIp(); + properties.put("listeners", "PLAINTEXT://" + ip + ":" + port); + properties.put("advertised.listeners", "PLAINTEXT://" + ip + ":" + port); + return properties; + } + + private KafkaServerStartable kafkaServer; + + private ZkConnection zkConnection; + + public MockKafka(ZkConnection zkServerConnection) { + this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), "9092", "1"); + start(); + } + + private MockKafka(Properties properties) { + KafkaConfig kafkaConfig = new KafkaConfig(properties); + kafkaServer = new KafkaServerStartable(kafkaConfig); + } + + public MockKafka(ZkConnection zkServerConnection, int port, int brokerId) { + this(zkServerConnection, System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString(), String.valueOf(port), String.valueOf(brokerId)); + start(); + } + + private MockKafka(ZkConnection zkServerConnection, String logDir, String port, String brokerId) { + this(createProperties(zkServerConnection, logDir, port, brokerId)); + this.zkConnection = zkServerConnection; + System.out.println(String.format("Kafka %s:%s dir:%s", kafkaServer.serverConfig().brokerId(), kafkaServer.serverConfig().port(), kafkaServer.serverConfig().logDirs())); + } + + public void createTopic(String topic, int partition, int replication) { + ZkClient zkClient = new ZkClient(zkConnection); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + zkClient.setZkSerializer(new ZKStringSerializer()); + AdminUtils.createTopic(zkUtils, topic, partition, replication, new Properties(), null); + zkClient.close(); + } + + public void createTopic(String topic) { + this.createTopic(topic, 1, 1); + } + + public MetadataResponse.TopicMetadata fetchTopicMeta(String topic) { + ZkClient zkClient = new ZkClient(zkConnection); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + zkClient.setZkSerializer(new ZKStringSerializer()); + MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils); + zkClient.close(); + return topicMetadata; + } + + /** + * Delete may not work + * + * @param topic + */ + public void deleteTopic(String topic) { + ZkClient zkClient = new ZkClient(zkConnection); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + zkClient.setZkSerializer(new ZKStringSerializer()); + AdminUtils.deleteTopic(zkUtils, topic); + zkClient.close(); + } + + public String getConnectionString() { + return String.format("%s:%d", kafkaServer.serverConfig().hostName(), kafkaServer.serverConfig().port()); + } + + public void start() { + kafkaServer.startup(); + System.out.println("embedded kafka is up"); + } + + public void stop() { + kafkaServer.shutdown(); + System.out.println("embedded kafka down"); + } + + public MetadataResponse.TopicMetadata waitTopicUntilReady(String topic) { + boolean isReady = false; + MetadataResponse.TopicMetadata topicMeta = null; + while (!isReady) { + Random random = new Random(); + topicMeta = this.fetchTopicMeta(topic); + List<MetadataResponse.PartitionMetadata> partitionsMetadata = topicMeta.partitionMetadata(); + Iterator<MetadataResponse.PartitionMetadata> iterator = partitionsMetadata.iterator(); + boolean hasGotLeader = true; + boolean hasGotReplica = true; + while (iterator.hasNext()) { + MetadataResponse.PartitionMetadata partitionMeta = iterator.next(); + hasGotLeader &= (!partitionMeta.leader().isEmpty()); + if (partitionMeta.leader().isEmpty()) { + System.out.println("Partition leader is not ready, wait 1s."); + break; + } + hasGotReplica &= (!partitionMeta.replicas().isEmpty()); + if (partitionMeta.replicas().isEmpty()) { + System.out.println("Partition replica is not ready, wait 1s."); + break; + } + } + isReady = hasGotLeader & hasGotReplica; + if (!isReady) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } + return topicMeta; + } + + public String getZookeeperConnection() { + return this.zkConnection.getServers(); + } +} + +class ZKStringSerializer implements ZkSerializer { + + @Override + public byte[] serialize(Object data) throws ZkMarshallingError { + byte[] bytes = null; + try { + bytes = data.toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new ZkMarshallingError(e); + } + return bytes; + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + if (bytes == null) + return null; + else + try { + return new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new ZkMarshallingError(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java new file mode 100644 index 0000000..98f6d04 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/provision/NetworkUtils.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.provision; + +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; + +public class NetworkUtils { + + public static String getLocalIp() { + try { + Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface iface = interfaces.nextElement(); + if (iface.isLoopback() || !iface.isUp() || iface.isVirtual() || iface.isPointToPoint()) + continue; + if (iface.getName().startsWith("vboxnet")) + continue; + + Enumeration<InetAddress> addresses = iface.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress addr = addresses.nextElement(); + final String ip = addr.getHostAddress(); + if (Inet4Address.class == addr.getClass()) + return ip; + } + } + } catch (SocketException e) { + throw new RuntimeException(e); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1fdf81a..09ef0e8 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ <!-- HBase versions --> <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version> - <kafka.version>0.8.1</kafka.version> + <kafka.version>0.10.0.0</kafka.version> <!-- Hadoop deps, keep compatible with hadoop2.version --> <zookeeper.version>3.4.6</zookeeper.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 520d7cc..09ac522 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -50,6 +50,7 @@ import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.metadata.model.LookupDesc; import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; @@ -69,6 +70,16 @@ public class HiveMRInput implements IMRInput { return new HiveTableInputFormat(table.getIdentity()); } + @Override + public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) { + return new IMRBatchMergeInputSide() { + @Override + public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { + // doing nothing + } + }; + } + public static class HiveTableInputFormat implements IMRTableInputFormat { final String dbName; final String tableName; http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index 90c2211..212f4c6 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -32,10 +32,11 @@ </parent> - <properties> - </properties> - <dependencies> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-mr</artifactId> + </dependency> <dependency> <groupId>org.apache.kylin</groupId> @@ -60,16 +61,10 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-hcatalog-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> - </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java new file mode 100644 index 0000000..cfce137 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.kafka.config.KafkaConfig; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +public class KafkaMRInput implements IMRInput { + + CubeSegment cubeSegment; + + @Override + public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + this.cubeSegment = (CubeSegment)flatDesc.getSegment(); + return new BatchCubingInputSide(cubeSegment); + } + + @Override + public IMRTableInputFormat getTableInputFormat(TableDesc table) { + KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); + KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(table.getIdentity()); + List<TblColRef> columns = Lists.transform(Arrays.asList(table.getColumns()), new Function<ColumnDesc, TblColRef>() { + @Nullable + @Override + public TblColRef apply(ColumnDesc input) { + return input.getRef(); + } + }); + + return new KafkaTableInputFormat(cubeSegment, columns, kafkaConfig, null); + } + + @Override + public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) { + return new KafkaMRBatchMergeInputSide((CubeSegment) seg); + } + + public static class KafkaTableInputFormat implements IMRTableInputFormat { + private final CubeSegment cubeSegment; + private List<TblColRef> columns; + private StreamingParser streamingParser; + private KafkaConfig kafkaConfig; + private final JobEngineConfig conf; + + public KafkaTableInputFormat(CubeSegment cubeSegment, List<TblColRef> columns, KafkaConfig kafkaConfig, JobEngineConfig conf) { + this.cubeSegment = cubeSegment; + this.columns = columns; + this.kafkaConfig = kafkaConfig; + this.conf = conf; + } + + @Override + public void configureJob(Job job) { + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapOutputValueClass(Text.class); + String jobId = job.getConfiguration().get(BatchConstants.ARG_CUBING_JOB_ID); + IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); + String inputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + try { + FileInputFormat.addInputPath(job, new Path(inputPath)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + public String[] parseMapperInput(Object mapperInput) { + if (streamingParser == null) { + try { + streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); + } catch (ReflectiveOperationException e) { + throw new IllegalArgumentException(); + } + } + Text text = (Text) mapperInput; + ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, text.getLength()).slice(); + StreamingMessage streamingMessage = streamingParser.parse(buffer); + return streamingMessage.getData().toArray(new String[streamingMessage.getData().size()]); + } + + } + + public static class BatchCubingInputSide implements IMRBatchCubingInputSide { + + final JobEngineConfig conf; + final CubeSegment seg; + private String outputPath; + + public BatchCubingInputSide(CubeSegment seg) { + this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + this.seg = seg; + } + + @Override + public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(createUpdateSegmentOffsetStep(jobFlow.getId())); + jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId())); + } + + public SeekOffsetStep createUpdateSegmentOffsetStep(String jobId) { + final SeekOffsetStep result = new SeekOffsetStep(); + result.setName("Seek and update offset step"); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + + return result; + } + + private MapReduceExecutable createSaveKafkaDataStep(String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + + IJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg); + outputPath = JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + result.setName("Save data from Kafka"); + result.setMapReduceJobClass(KafkaFlatTableJob.class); + JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "system"); + StringBuilder cmd = new StringBuilder(); + jobBuilderSupport.appendMapReduceParameters(cmd); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Save_Kafka_Data_" + seg.getRealization().getName() + "_Step"); + + result.setMapReduceParams(cmd.toString()); + return result; + } + + @Override + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + final UpdateTimeRangeStep result = new UpdateTimeRangeStep(); + result.setName("Update Segment Time Range"); + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams()); + JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(seg, "SYSTEM"); + result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, jobBuilderSupport.getFactDistinctColumnsPath(jobFlow.getId())); + jobFlow.addTask(result); + + } + + @Override + public IMRTableInputFormat getFlatTableInputFormat() { + KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); + KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(seg.getRealization().getFactTable()); + List<TblColRef> columns = new CubeJoinedFlatTableDesc(seg).getAllColumns(); + + return new KafkaTableInputFormat(seg, columns, kafkaConfig, conf); + + } + + } + + class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide { + + private CubeSegment cubeSegment; + + KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) { + this.cubeSegment = cubeSegment; + } + + @Override + public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) { + + final MergeOffsetStep result = new MergeOffsetStep(); + result.setName("Merge offset step"); + + CubingExecutableUtil.setCubeName(cubeSegment.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(cubeSegment.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobFlow.getId(), result.getParams()); + jobFlow.addTask(result); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java new file mode 100644 index 0000000..d039583 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.kafka; + +import com.google.common.collect.Lists; +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.engine.streaming.StreamingConfig; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.kafka.config.KafkaConfig; + +import java.util.List; + +//used by reflection +public class KafkaSource implements ISource { + + @SuppressWarnings("unchecked") + @Override + public <I> I adaptToBuildEngine(Class<I> engineInterface) { + if (engineInterface == IMRInput.class) { + return (I) new KafkaMRInput(); + } else { + throw new RuntimeException("Cannot adapt to " + engineInterface); + } + } + + @Override + public ReadableTable createReadableTable(TableDesc tableDesc) { + throw new UnsupportedOperationException(); + } + + @Override + public List<String> getMRDependentResources(TableDesc table) { + List<String> dependentResources = Lists.newArrayList(); + dependentResources.add(KafkaConfig.concatResourcePath(table.getIdentity())); + dependentResources.add(StreamingConfig.concatResourcePath(table.getIdentity())); + return dependentResources; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index 66142c5..78a67c2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -25,6 +25,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.base.Function; +import kafka.cluster.BrokerEndPoint; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StreamingBatch; @@ -51,6 +54,8 @@ import kafka.javaapi.FetchResponse; import kafka.javaapi.PartitionMetadata; import kafka.message.MessageAndOffset; +import javax.annotation.Nullable; + @SuppressWarnings("unused") public class KafkaStreamingInput implements IStreamingInput { @@ -136,8 +141,16 @@ public class KafkaStreamingInput implements IStreamingInput { if (partitionMetadata.errorCode() != 0) { logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode()); } - replicaBrokers = partitionMetadata.replicas(); - return partitionMetadata.leader(); + replicaBrokers = Lists.transform(partitionMetadata.replicas(), new Function<BrokerEndPoint, Broker>() { + @Nullable + @Override + public Broker apply(@Nullable BrokerEndPoint brokerEndPoint) { + return new Broker(brokerEndPoint, SecurityProtocol.PLAINTEXT); + } + }); + BrokerEndPoint leaderEndpoint = partitionMetadata.leader(); + + return new Broker(leaderEndpoint, SecurityProtocol.PLAINTEXT); } else { return null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java new file mode 100644 index 0000000..a21b980 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/MergeOffsetStep.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; + +/** + */ +public class MergeOffsetStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class); + public MergeOffsetStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + List<CubeSegment> mergingSegs = cube.getMergingSegments(segment); + Map<Integer, Long> mergedStartOffsets = Maps.newHashMap(); + Map<Integer, Long> mergedEndOffsets = Maps.newHashMap(); + + long dateRangeStart = Long.MAX_VALUE, dateRangeEnd = 0; + for (CubeSegment seg: mergingSegs) { + Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(seg); + Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(seg); + + for (Integer partition : startOffsets.keySet()) { + long currentStart = mergedStartOffsets.get(partition) != null ? Long.valueOf(mergedStartOffsets.get(partition)) : Long.MAX_VALUE; + long currentEnd = mergedEndOffsets.get(partition) != null ? Long.valueOf(mergedEndOffsets.get(partition)) : 0; + mergedStartOffsets.put(partition, Math.min(currentStart, startOffsets.get(partition))); + mergedEndOffsets.put(partition, Math.max(currentEnd, endOffsets.get(partition))); + } + dateRangeStart = Math.min(dateRangeStart, seg.getDateRangeStart()); + dateRangeEnd = Math.max(dateRangeEnd, seg.getDateRangeEnd()); + } + + KafkaOffsetMapping.saveOffsetStart(segment, mergedStartOffsets); + KafkaOffsetMapping.saveOffsetEnd(segment, mergedEndOffsets); + segment.setDateRangeStart(dateRangeStart); + segment.setDateRangeEnd(dateRangeEnd); + + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java new file mode 100644 index 0000000..5dca93f --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka; + +import org.apache.kylin.source.kafka.util.KafkaClient; +import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + */ +public class SeekOffsetStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(SeekOffsetStep.class); + + public SeekOffsetStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + Map<Integer, Long> startOffsets = KafkaOffsetMapping.parseOffsetStart(segment); + Map<Integer, Long> endOffsets = KafkaOffsetMapping.parseOffsetEnd(segment); + + if (startOffsets.size() > 0 && endOffsets.size() > 0 && startOffsets.size() == endOffsets.size()) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, "skipped, as the offset is provided."); + } + + final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(context.getConfig()).getKafkaConfig(cube.getFactTable()); + final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); + final String topic = kafakaConfig.getTopic(); + try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + + if (startOffsets.isEmpty()) { + // user didn't specify start offset, use the biggest offset in existing segments as start + for (CubeSegment seg : cube.getSegments()) { + Map<Integer, Long> segEndOffset = KafkaOffsetMapping.parseOffsetEnd(seg); + for (PartitionInfo partition : partitionInfos) { + int partitionId = partition.partition(); + if (segEndOffset.containsKey(partitionId)) { + startOffsets.put(partitionId, Math.max(startOffsets.containsKey(partitionId) ? startOffsets.get(partitionId) : 0, segEndOffset.get(partitionId))); + } + } + } + + if (partitionInfos.size() > startOffsets.size()) { + // has new partition added + for (int x = startOffsets.size(); x < partitionInfos.size(); x++) { + long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); + startOffsets.put(partitionInfos.get(x).partition(), earliest); + } + } + + logger.info("Get start offset for segment " + segment.getName() + ": " + startOffsets.toString()); + } + + if (endOffsets.isEmpty()) { + // user didn't specify end offset, use latest offset in kafka + for (PartitionInfo partitionInfo : partitionInfos) { + long latest = KafkaClient.getLatestOffset(consumer, topic, partitionInfo.partition()); + endOffsets.put(partitionInfo.partition(), latest); + } + + logger.info("Get end offset for segment " + segment.getName() + ": " + endOffsets.toString()); + } + } + + KafkaOffsetMapping.saveOffsetStart(segment, startOffsets); + KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets); + + segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd())); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java new file mode 100644 index 0000000..bb64bf9 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class UpdateTimeRangeStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class); + + public UpdateTimeRangeStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); + final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + final Path outputFile = new Path(outputPath, partitionCol.getName()); + + String minValue = null, maxValue = null, currentValue = null; + try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + minValue = currentValue = bufferedReader.readLine(); + while (currentValue != null) { + maxValue = currentValue; + currentValue = bufferedReader.readLine(); + } + } catch (IOException e) { + logger.error("fail to read file " + outputFile, e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + + final DataType partitionColType = partitionCol.getType(); + FastDateFormat dateFormat; + if (partitionColType.isDate()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); + } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); + } else if (partitionColType.isStringFamily()) { + String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) { + partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; + } + dateFormat = DateFormat.getDateFormat(partitionDateFormat); + } else { + return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type"); + } + + try { + long startTime = dateFormat.parse(minValue).getTime(); + long endTime = dateFormat.parse(maxValue).getTime(); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + segment.setDateRangeStart(startTime); + segment.setDateRangeEnd(endTime); + cubeBuilder.setToUpdateSegs(segment); + cubeManager.updateCube(cubeBuilder); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (Exception e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8431af45/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java index 04a66f6..95349c2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java @@ -22,6 +22,7 @@ import java.util.List; import javax.annotation.Nullable; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.persistence.Serializer; @@ -67,7 +68,7 @@ public class KafkaClusterConfig extends RootPersistentEntity { @Nullable @Override public Broker apply(BrokerConfig input) { - return new Broker(input.getId(), input.getHost(), input.getPort()); + return new Broker(input.getId(), input.getHost(), input.getPort(), SecurityProtocol.PLAINTEXT); } }); }