http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index d4308db..7db8285 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -1,37 +1,20 @@ /* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ package org.apache.kylin.source.kafka; import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java index 4145ef6..d84d3db 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TopicMeta.java @@ -1,37 +1,20 @@ /* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ package org.apache.kylin.source.kafka; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java new file mode 100644 index 0000000..bb64bf9 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/UpdateTimeRangeStep.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class UpdateTimeRangeStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(UpdateTimeRangeStep.class); + + public UpdateTimeRangeStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); + final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); + final Path outputFile = new Path(outputPath, partitionCol.getName()); + + String minValue = null, maxValue = null, currentValue = null; + try (FileSystem fs = HadoopUtil.getFileSystem(outputPath); FSDataInputStream inputStream = fs.open(outputFile); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + minValue = currentValue = bufferedReader.readLine(); + while (currentValue != null) { + maxValue = currentValue; + currentValue = bufferedReader.readLine(); + } + } catch (IOException e) { + logger.error("fail to read file " + outputFile, e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + + final DataType partitionColType = partitionCol.getType(); + FastDateFormat dateFormat; + if (partitionColType.isDate()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); + } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { + dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); + } else if (partitionColType.isStringFamily()) { + String partitionDateFormat = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateFormat(); + if (StringUtils.isEmpty(partitionDateFormat)) { + partitionDateFormat = DateFormat.DEFAULT_DATE_PATTERN; + } + dateFormat = DateFormat.getDateFormat(partitionDateFormat); + } else { + return new ExecuteResult(ExecuteResult.State.ERROR, "Type " + partitionColType + " is not valid partition column type"); + } + + try { + long startTime = dateFormat.parse(minValue).getTime(); + long endTime = dateFormat.parse(maxValue).getTime(); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + segment.setDateRangeStart(startTime); + segment.setDateRangeEnd(endTime); + cubeBuilder.setToUpdateSegs(segment); + cubeManager.updateCube(cubeBuilder); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (Exception e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java index 04a66f6..95349c2 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java @@ -22,6 +22,7 @@ import java.util.List; import javax.annotation.Nullable; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.persistence.Serializer; @@ -67,7 +68,7 @@ public class KafkaClusterConfig extends RootPersistentEntity { @Nullable @Override public Broker apply(BrokerConfig input) { - return new Broker(input.getId(), input.getHost(), input.getPort()); + return new Broker(input.getId(), input.getHost(), input.getPort(), SecurityProtocol.PLAINTEXT); } }); } http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java new file mode 100644 index 0000000..decfb60 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableJob.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.kafka.hadoop; + +import org.apache.kylin.source.kafka.util.KafkaClient; +import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.source.kafka.KafkaConfigManager; +import org.apache.kylin.source.kafka.config.KafkaConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * Run a Hadoop Job to process the stream data in kafka; + * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader + */ +public class KafkaFlatTableJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(KafkaFlatTableJob.class); + + public static final String CONFIG_KAFKA_PARITION_MIN = "kafka.partition.min"; + public static final String CONFIG_KAFKA_PARITION_MAX = "kafka.partition.max"; + public static final String CONFIG_KAFKA_PARITION_START = "kafka.partition.start."; + public static final String CONFIG_KAFKA_PARITION_END = "kafka.partition.end."; + + public static final String CONFIG_KAFKA_BROKERS = "kafka.brokers"; + public static final String CONFIG_KAFKA_TOPIC = "kafka.topic"; + public static final String CONFIG_KAFKA_TIMEOUT = "kafka.connect.timeout"; + public static final String CONFIG_KAFKA_BUFFER_SIZE = "kafka.connect.buffer.size"; + public static final String CONFIG_KAFKA_CONSUMER_GROUP = "kafka.consumer.group"; + public static final String CONFIG_KAFKA_INPUT_FORMAT = "input.format"; + public static final String CONFIG_KAFKA_PARSER_NAME = "kafka.parser.name"; + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_SEGMENT_NAME); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()); + KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(cube.getFactTable()); + String brokers = KafkaClient.getKafkaBrokers(kafkaConfig); + String topic = kafkaConfig.getTopic(); + + if (brokers == null || brokers.length() == 0 || topic == null) { + throw new IllegalArgumentException("Invalid Kafka information, brokers " + brokers + ", topic " + topic); + } + + job.getConfiguration().set(CONFIG_KAFKA_BROKERS, brokers); + job.getConfiguration().set(CONFIG_KAFKA_TOPIC, topic); + job.getConfiguration().set(CONFIG_KAFKA_TIMEOUT, String.valueOf(kafkaConfig.getTimeout())); + job.getConfiguration().set(CONFIG_KAFKA_BUFFER_SIZE, String.valueOf(kafkaConfig.getBufferSize())); + job.getConfiguration().set(CONFIG_KAFKA_INPUT_FORMAT, "json"); + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + job.getConfiguration().set(CONFIG_KAFKA_PARSER_NAME, kafkaConfig.getParserName()); + job.getConfiguration().set(CONFIG_KAFKA_CONSUMER_GROUP, cubeName); // use cubeName as consumer group name + setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); + job.setNumReduceTasks(0); + FileOutputFormat.setOutputPath(job, output); + FileOutputFormat.setCompressOutput(job, true); + org.apache.log4j.Logger.getRootLogger().info("Output hdfs location: " + output); + org.apache.log4j.Logger.getRootLogger().info("Output hdfs compression: " + true); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + deletePath(job.getConfiguration(), output); + + attachKylinPropsAndMetadata(cube, job.getConfiguration()); + + return waitForCompletion(job); + + } catch (Exception e) { + logger.error("error in KafkaFlatTableJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + + } + + private void setupMapper(CubeSegment cubeSeg) throws IOException { + // set the segment's offset info to job conf + Map<Integer, Long> offsetStart = KafkaOffsetMapping.parseOffsetStart(cubeSeg); + Map<Integer, Long> offsetEnd = KafkaOffsetMapping.parseOffsetEnd(cubeSeg); + + Integer minPartition = Collections.min(offsetStart.keySet()); + Integer maxPartition = Collections.max(offsetStart.keySet()); + job.getConfiguration().set(CONFIG_KAFKA_PARITION_MIN, minPartition.toString()); + job.getConfiguration().set(CONFIG_KAFKA_PARITION_MAX, maxPartition.toString()); + + for(Integer partition: offsetStart.keySet()) { + job.getConfiguration().set(CONFIG_KAFKA_PARITION_START + partition, offsetStart.get(partition).toString()); + job.getConfiguration().set(CONFIG_KAFKA_PARITION_END + partition, offsetEnd.get(partition).toString()); + } + + job.setMapperClass(KafkaFlatTableMapper.class); + job.setInputFormatClass(KafkaInputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setNumReduceTasks(0); + } + + public static void main(String[] args) throws Exception { + KafkaFlatTableJob job = new KafkaFlatTableJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java new file mode 100644 index 0000000..995b2d4 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaFlatTableMapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.kafka.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.engine.mr.KylinMapper; + +public class KafkaFlatTableMapper extends KylinMapper<LongWritable, BytesWritable, Text, Text> { + + private Text outKey = new Text(); + private Text outValue = new Text(); + + @Override + protected void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + bindCurrentConfiguration(conf); + } + + @Override + public void map(LongWritable key, BytesWritable value, Context context) throws IOException { + try { + outKey.set(Bytes.toBytes(key.get())); + outValue.set(value.getBytes(), 0, value.getLength()); + context.write(outKey, outValue); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java new file mode 100644 index 0000000..81f6bac --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputFormat.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.kafka.hadoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.kylin.source.kafka.util.KafkaClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; + +/** + * Convert Kafka topic to Hadoop InputFormat + * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader + */ +public class KafkaInputFormat extends InputFormat<LongWritable, BytesWritable> { + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + + String brokers = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BROKERS); + String inputTopic = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TOPIC); + String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); + Integer partitionMin = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MIN)); + Integer partitionMax = Integer.valueOf(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_MAX)); + + Map<Integer, Long> startOffsetMap = Maps.newHashMap(); + Map<Integer, Long> endOffsetMap = Maps.newHashMap(); + for (int i = partitionMin; i <= partitionMax; i++) { + String start = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_START + i); + String end = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_PARITION_END + i); + if (start != null && end != null) { + startOffsetMap.put(i, Long.valueOf(start)); + endOffsetMap.put(i, Long.valueOf(end)); + } + } + + List<InputSplit> splits = new ArrayList<InputSplit>(); + try (KafkaConsumer<String, String> consumer = KafkaClient.getKafkaConsumer(brokers, consumerGroup, null)) { + List<PartitionInfo> partitionInfos = consumer.partitionsFor(inputTopic); + Preconditions.checkArgument(partitionInfos.size() == startOffsetMap.size(), "partition number mismatch with server side"); + for (int i = 0; i < partitionInfos.size(); i++) { + PartitionInfo partition = partitionInfos.get(i); + int partitionId = partition.partition(); + if (startOffsetMap.containsKey(partitionId) == false) { + throw new IllegalStateException("Partition '" + partitionId + "' not exists."); + } + + if (endOffsetMap.get(partitionId) > startOffsetMap.get(partitionId)) { + InputSplit split = new KafkaInputSplit( + brokers, inputTopic, + partitionId, + startOffsetMap.get(partitionId), endOffsetMap.get(partitionId) + ); + splits.add(split); + } + } + } + return splits; + } + + @Override + public RecordReader<LongWritable, BytesWritable> createRecordReader( + InputSplit arg0, TaskAttemptContext arg1) throws IOException, + InterruptedException { + return new KafkaInputRecordReader(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java new file mode 100644 index 0000000..f67fef5 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputRecordReader.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.kafka.hadoop; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kylin.common.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Convert Kafka topic to Hadoop InputFormat + * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader + */ +public class KafkaInputRecordReader extends RecordReader<LongWritable, BytesWritable> { + + static Logger log = LoggerFactory.getLogger(KafkaInputRecordReader.class); + + private Configuration conf; + + private KafkaInputSplit split; + private Consumer consumer; + private String brokers; + private String topic; + + private int partition; + private long earliestOffset; + private long watermark; + private long latestOffset; + + private ConsumerRecords<String, String> messages; + private Iterator<ConsumerRecord<String, String>> iterator; + private LongWritable key; + private BytesWritable value; + + private long timeOut = 60000; + private long bufferSize = 65536; + + private long numProcessedMessages = 0L; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + initialize(split, context.getConfiguration()); + } + + public void initialize(InputSplit split, Configuration conf) throws IOException, InterruptedException { + this.conf = conf; + this.split = (KafkaInputSplit) split; + brokers = this.split.getBrokers(); + topic = this.split.getTopic(); + partition = this.split.getPartition(); + watermark = this.split.getOffsetStart(); + + if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT) != null) { + timeOut = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_TIMEOUT)); + } + if (conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE) != null) { + bufferSize = Long.parseLong(conf.get(KafkaFlatTableJob.CONFIG_KAFKA_BUFFER_SIZE)); + } + + String consumerGroup = conf.get(KafkaFlatTableJob.CONFIG_KAFKA_CONSUMER_GROUP); + consumer = org.apache.kylin.source.kafka.util.KafkaClient.getKafkaConsumer(brokers, consumerGroup, null); + + earliestOffset = this.split.getOffsetStart(); + latestOffset = this.split.getOffsetEnd(); + TopicPartition topicPartition = new TopicPartition(topic, partition); + consumer.assign(Arrays.asList(topicPartition)); + log.info("Split {} Topic: {} Broker: {} Partition: {} Start: {} End: {}", new Object[] { this.split, topic, this.split.getBrokers(), partition, earliestOffset, latestOffset }); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) { + key = new LongWritable(); + } + if (value == null) { + value = new BytesWritable(); + } + + if (messages == null) { + log.info("{} fetching offset {} ", topic + ":" + split.getBrokers() + ":" + partition, watermark); + TopicPartition topicPartition = new TopicPartition(topic, partition); + consumer.seek(topicPartition, watermark); + messages = consumer.poll(timeOut); + iterator = messages.iterator(); + if (!iterator.hasNext()) { + log.info("No more messages, stop"); + throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark)); + } + } + + if (iterator.hasNext()) { + ConsumerRecord<String, String> message = iterator.next(); + if (message.offset() >= latestOffset) { + log.info("Reach the end offset, stop reading."); + return false; + } + key.set(message.offset()); + byte[] valuebytes = Bytes.toBytes(message.value()); + value.set(valuebytes, 0, valuebytes.length); + watermark = message.offset() + 1; + numProcessedMessages++; + if (!iterator.hasNext()) { + messages = null; + iterator = null; + } + return true; + } + + log.error("Unexpected iterator end."); + throw new IOException(String.format("Unexpected ending of stream, expected ending offset %d, but end at %d", latestOffset, watermark)); + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public BytesWritable getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + if (watermark >= latestOffset || earliestOffset == latestOffset) { + return 1.0f; + } + return Math.min(1.0f, (watermark - earliestOffset) / (float) (latestOffset - earliestOffset)); + } + + @Override + public void close() throws IOException { + log.info("{} num. processed messages {} ", topic + ":" + split.getBrokers() + ":" + partition, numProcessedMessages); + consumer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java new file mode 100644 index 0000000..3261399 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/hadoop/KafkaInputSplit.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.kafka.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Convert Kafka topic to Hadoop InputFormat + * Modified from the kafka-hadoop-loader in https://github.com/amient/kafka-hadoop-loader + */ +public class KafkaInputSplit extends InputSplit implements Writable { + + private String brokers; + private String topic; + private int partition; + private long offsetStart; + private long offsetEnd; + + public KafkaInputSplit() { + } + + public KafkaInputSplit(String brokers, String topic, int partition, long offsetStart, long offsetEnd) { + this.brokers = brokers; + this.topic = topic; + this.partition = partition; + this.offsetStart = offsetStart; + this.offsetEnd = offsetEnd; + } + + public void readFields(DataInput in) throws IOException { + brokers = Text.readString(in); + topic = Text.readString(in); + partition = in.readInt(); + offsetStart = in.readLong(); + offsetEnd = in.readLong(); + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, brokers); + Text.writeString(out, topic); + out.writeInt(partition); + out.writeLong(offsetStart); + out.writeLong(offsetEnd); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return Long.MAX_VALUE; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{brokers}; + } + + public int getPartition() { + return partition; + } + + public String getTopic() { + return topic; + } + + public String getBrokers() { + return brokers; + } + + public long getOffsetStart() { + return offsetStart; + } + + public long getOffsetEnd() { + return offsetEnd; + } + + @Override + public String toString() { + return brokers + "-" + topic + "-" + partition + "-" + offsetStart + "-" + offsetEnd; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java new file mode 100644 index 0000000..640cc53 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka.util; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kylin.source.kafka.config.BrokerConfig; +import org.apache.kylin.source.kafka.config.KafkaClusterConfig; +import org.apache.kylin.source.kafka.config.KafkaConfig; + +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +/** + */ +public class KafkaClient { + + public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGroup, Properties properties) { + Properties props = constructDefaultKafkaConsumerProperties(brokers, consumerGroup, properties); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); + return consumer; + } + + public static KafkaProducer getKafkaProducer(String brokers, Properties properties) { + Properties props = constructDefaultKafkaProducerProperties(brokers, properties); + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); + return producer; + } + + private static Properties constructDefaultKafkaProducerProperties(String brokers, Properties properties){ + Properties props = new Properties(); + props.put("bootstrap.servers", brokers); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("acks", "1"); + props.put("buffer.memory", 33554432); + props.put("retries", 0); + props.put("batch.size", 16384); + props.put("linger.ms", 50); + props.put("timeout.ms", "30000"); + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + return props; + } + + private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) { + Properties props = new Properties(); + props.put("bootstrap.servers", brokers); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("group.id", consumerGroup); + props.put("session.timeout.ms", "30000"); + props.put("enable.auto.commit", "false"); + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + props.put(entry.getKey(), entry.getValue()); + } + } + return props; + } + + public static String getKafkaBrokers(KafkaConfig kafkaConfig) { + String brokers = null; + for (KafkaClusterConfig clusterConfig : kafkaConfig.getKafkaClusterConfigs()) { + for (BrokerConfig brokerConfig : clusterConfig.getBrokerConfigs()) { + if (brokers == null) { + brokers = brokerConfig.getHost() + ":" + brokerConfig.getPort(); + } else { + brokers = brokers + "," + brokerConfig.getHost() + ":" + brokerConfig.getPort(); + } + } + } + return brokers; + } + + public static long getEarliestOffset(KafkaConsumer consumer, String topic, int partitionId) { + + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + consumer.assign(Arrays.asList(topicPartition)); + consumer.seekToBeginning(Arrays.asList(topicPartition)); + + return consumer.position(topicPartition); + } + + public static long getLatestOffset(KafkaConsumer consumer, String topic, int partitionId) { + + TopicPartition topicPartition = new TopicPartition(topic, partitionId); + consumer.assign(Arrays.asList(topicPartition)); + consumer.seekToEnd(Arrays.asList(topicPartition)); + + return consumer.position(topicPartition); + } + + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java new file mode 100644 index 0000000..b46e57f --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaOffsetMapping.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.kafka.util; + +import com.google.common.collect.Maps; +import org.apache.kylin.cube.CubeSegment; + +import java.util.Map; + +/** + */ +public class KafkaOffsetMapping { + + public static final String OFFSET_START = "kafka.offset.start."; + public static final String OFFSET_END = "kafka.offset.end."; + + /** + * Get the start offsets for each partition from a segment + * + * @param segment + * @return + */ + public static Map<Integer, Long> parseOffsetStart(CubeSegment segment) { + return parseOffset(segment, OFFSET_START); + } + + /** + * Get the end offsets for each partition from a segment + * + * @param segment + * @return + */ + public static Map<Integer, Long> parseOffsetEnd(CubeSegment segment) { + return parseOffset(segment, OFFSET_END); + } + + /** + * Save the partition start offset to cube segment + * + * @param segment + * @param offsetStart + */ + public static void saveOffsetStart(CubeSegment segment, Map<Integer, Long> offsetStart) { + long sourceOffsetStart = 0; + for (Integer partition : offsetStart.keySet()) { + segment.getAdditionalInfo().put(OFFSET_START + partition, String.valueOf(offsetStart.get(partition))); + sourceOffsetStart += offsetStart.get(partition); + } + + segment.setSourceOffsetStart(sourceOffsetStart); + } + + /** + * Save the partition end offset to cube segment + * + * @param segment + * @param offsetEnd + */ + public static void saveOffsetEnd(CubeSegment segment, Map<Integer, Long> offsetEnd) { + long sourceOffsetEnd = 0; + for (Integer partition : offsetEnd.keySet()) { + segment.getAdditionalInfo().put(OFFSET_END + partition, String.valueOf(offsetEnd.get(partition))); + sourceOffsetEnd += offsetEnd.get(partition); + } + + segment.setSourceOffsetEnd(sourceOffsetEnd); + } + + private static Map<Integer, Long> parseOffset(CubeSegment segment, String propertyPrefix) { + final Map<Integer, Long> offsetStartMap = Maps.newHashMap(); + for (String key : segment.getAdditionalInfo().keySet()) { + if (key.startsWith(propertyPrefix)) { + Integer partition = Integer.valueOf(key.substring(propertyPrefix.length())); + Long offset = Long.valueOf(segment.getAdditionalInfo().get(key)); + offsetStartMap.put(partition, offset); + } + } + + + return offsetStartMap; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java index 58cba7d..ddc2eb7 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java @@ -1,37 +1,20 @@ /* - * - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * - * contributor license agreements. See the NOTICE file distributed with - * - * this work for additional information regarding copyright ownership. - * - * The ASF licenses this file to You under the Apache License, Version 2.0 - * - * (the "License"); you may not use this file except in compliance with - * - * the License. You may obtain a copy of the License at - * - * - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * - * - * Unless required by applicable law or agreed to in writing, software - * - * distributed under the License is distributed on an "AS IS" BASIS, - * - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * - * See the License for the specific language governing permissions and - * - * limitations under the License. - * - * / - */ - + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ package org.apache.kylin.source.kafka.util; import java.util.Collections; @@ -42,6 +25,8 @@ import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; +import kafka.cluster.BrokerEndPoint; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.source.kafka.TopicMeta; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.slf4j.Logger; @@ -86,13 +71,14 @@ public final class KafkaRequester { if (consumerCache.containsKey(key)) { return consumerCache.get(key); } else { - consumerCache.putIfAbsent(key, new SimpleConsumer(broker.host(), broker.port(), timeout, bufferSize, clientId)); + BrokerEndPoint brokerEndPoint = broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT); + consumerCache.putIfAbsent(key, new SimpleConsumer(brokerEndPoint.host(), brokerEndPoint.port(), timeout, bufferSize, clientId)); return consumerCache.get(key); } } private static String createKey(Broker broker, int timeout, int bufferSize, String clientId) { - return broker.getConnectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId; + return broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).connectionString() + "_" + timeout + "_" + bufferSize + "_" + clientId; } public static TopicMeta getKafkaTopicMeta(KafkaClusterConfig kafkaClusterConfig) { http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index 24eaa05..ee5bb20 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; +import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.source.kafka.StreamingParser; @@ -55,7 +56,7 @@ public final class KafkaUtils { if (partitionMetadata.errorCode() != 0) { logger.warn("PartitionMetadata errorCode: " + partitionMetadata.errorCode()); } - return partitionMetadata.leader(); + return new Broker(partitionMetadata.leader(), SecurityProtocol.PLAINTEXT); } else { return null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index c7de287..f285153 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -39,7 +39,7 @@ import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.ISegment; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTScanRequest; http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index c318cba..da087c9 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -31,7 +31,7 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.ISegment; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.FuzzyKeyEncoder; import org.apache.kylin.cube.kv.FuzzyMaskEncoder; http://git-wip-us.apache.org/repos/asf/kylin/blob/81c7323b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index f1e5dab..5692000 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.ShardingHash; -import org.apache.kylin.cube.ISegment; +import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.dimension.DimensionEncoding;