Add KafkaSampleProducer for streaming live demo
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/147986be Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/147986be Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/147986be Branch: refs/heads/1.4-rc Commit: 147986be09750b4e0f11a2d3f3e652077322a425 Parents: 0b26522 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Feb 19 16:01:36 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Feb 19 16:15:04 2016 +0800 ---------------------------------------------------------------------- build/bin/streaming_build.sh | 3 +- .../source/kafka/util/KafkaSampleProducer.java | 99 ++++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/147986be/build/bin/streaming_build.sh ---------------------------------------------------------------------- diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh index cb86e29..a96ecc1 100644 --- a/build/bin/streaming_build.sh +++ b/build/bin/streaming_build.sh @@ -23,7 +23,6 @@ source ~/.bash_profile STREAMING=$1 INTERVAL=$2 DELAY=$3 -MARGIN=$4 CURRENT_TIME_IN_SECOND=`date +%s` CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000)) START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY)) @@ -31,4 +30,4 @@ END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL)) ID="$START"_"$END" echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log -sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -oneoff true -start ${START} -end ${END} -streaming ${STREAMING} -margin ${MARGIN} \ No newline at end of file +sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${STREAMING} ${ID} -start ${START} -end ${END} -streaming ${STREAMING} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/147986be/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java new file mode 100644 index 0000000..1846157 --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java @@ -0,0 +1,99 @@ +package org.apache.kylin.source.kafka.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.kylin.common.util.OptionsHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * A sample producer which will create sample data to kafka topic + */ +public class KafkaSampleProducer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaSampleProducer.class); + @SuppressWarnings("static-access") + private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic"); + private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker"); + private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay").create("delay"); + + private static final ObjectMapper mapper = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + logger.info("args: " + Arrays.toString(args)); + OptionsHelper optionsHelper = new OptionsHelper(); + Options options = new Options(); + String topic, broker; + options.addOption(OPTION_TOPIC); + options.addOption(OPTION_BROKER); + options.addOption(OPTION_DELAY); + optionsHelper.parseOptions(options, args); + + logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); + + topic = optionsHelper.getOptionValue(OPTION_TOPIC); + broker = optionsHelper.getOptionValue(OPTION_BROKER); + long delay = 0; + String delayString = optionsHelper.getOptionValue(OPTION_DELAY); + if (delayString != null) { + delay = Long.parseLong(optionsHelper.getOptionValue(OPTION_DELAY)); + } + + List<String> countries = new ArrayList(); + countries.add("AUSTRALIA"); + countries.add("CANADA"); + countries.add("CHINA"); + countries.add("INDIA"); + countries.add("JAPAN"); + countries.add("KOREA"); + countries.add("US"); + countries.add("Other"); + List<String> category = new ArrayList(); + category.add("BOOK"); + category.add("TOY"); + category.add("CLOTH"); + category.add("ELECTRONIC"); + category.add("Other"); + List<String> devices = new ArrayList(); + devices.add("iOS"); + devices.add("Windows"); + devices.add("Andriod"); + devices.add("Other"); + + Properties props = new Properties(); + props.put("metadata.broker.list", broker); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("request.required.acks", "1"); + + ProducerConfig config = new ProducerConfig(props); + + Producer<String, String> producer = new Producer<String, String>(config); + + boolean alive = true; + Random rnd = new Random(); + Map<String, Object> record = new HashMap(); + while (alive == true) { + record.put("order_time", (new Date().getTime() - delay)); + record.put("country", countries.get(rnd.nextInt(countries.size()))); + record.put("category", category.get(rnd.nextInt(category.size()))); + record.put("device", devices.get(rnd.nextInt(devices.size()))); + record.put("qty", rnd.nextInt(10)); + record.put("currency", "USD"); + record.put("amount", rnd.nextDouble() * 100); + KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); + System.out.println("Sending 1 message"); + producer.send(data); + Thread.sleep(2000); + } + producer.close(); + } + + +}