Repository: kylin Updated Branches: refs/heads/master c804dc8d7 -> 3047b53d6
KYLIN-1726 update sampleProducer and kylin.sh Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7a793e5c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7a793e5c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7a793e5c Branch: refs/heads/master Commit: 7a793e5c33f1f7bffd5a53d64ec92065abd5856a Parents: c804dc8 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Oct 25 18:19:40 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Thu Oct 27 09:10:16 2016 +0800 ---------------------------------------------------------------------- build/bin/kylin.sh | 7 +++-- .../source/kafka/util/KafkaSampleProducer.java | 29 ++++++++++++-------- 2 files changed, 22 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7a793e5c/build/bin/kylin.sh ---------------------------------------------------------------------- diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index 9286055e..ad3a952 100644 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -32,7 +32,6 @@ function retrieveDependency() { #retrive $hive_dependency and $hbase_dependency source ${dir}/find-hive-dependency.sh source ${dir}/find-hbase-dependency.sh - #source ${dir}/find-kafka-dependency.sh #retrive $KYLIN_EXTRA_START_OPTS if [ -f "${dir}/setenv.sh" ] @@ -41,7 +40,11 @@ function retrieveDependency() { export HBASE_CLASSPATH_PREFIX=${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH_PREFIX} export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency} - #export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${kafka_dependency} + if [ -n "$KAFKA_HOME" ] + then + source ${dir}/find-kafka-dependency.sh + export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${kafka_dependency} + fi } # start command http://git-wip-us.apache.org/repos/asf/kylin/blob/7a793e5c/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java index 3d26d3d..b8f98aa 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.UUID; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -48,7 +49,6 @@ public class KafkaSampleProducer { @SuppressWarnings("static-access") private static final Option OPTION_TOPIC = OptionBuilder.withArgName("topic").hasArg().isRequired(true).withDescription("Kafka topic").create("topic"); private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker"); - private static final Option OPTION_DELAY = OptionBuilder.withArgName("delay").hasArg().isRequired(false).withDescription("Simulated message delay in mili-seconds, default 0").create("delay"); private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval"); private static final ObjectMapper mapper = new ObjectMapper(); @@ -60,21 +60,14 @@ public class KafkaSampleProducer { String topic, broker; options.addOption(OPTION_TOPIC); options.addOption(OPTION_BROKER); - options.addOption(OPTION_DELAY); - options.addOption(OPTION_INTERVAL); optionsHelper.parseOptions(options, args); logger.info("options: '" + optionsHelper.getOptionsAsString() + "'"); topic = optionsHelper.getOptionValue(OPTION_TOPIC); broker = optionsHelper.getOptionValue(OPTION_BROKER); - long delay = 0; - String delayString = optionsHelper.getOptionValue(OPTION_DELAY); - if (delayString != null) { - delay = Long.parseLong(delayString); - } - long interval = 1000; + long interval = 10; String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL); if (intervalString != null) { interval = Long.parseLong(intervalString); @@ -101,6 +94,10 @@ public class KafkaSampleProducer { devices.add("Andriod"); devices.add("Other"); + List<String> genders = new ArrayList(); + genders.add("Male"); + genders.add("Female"); + Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("acks", "all"); @@ -117,15 +114,23 @@ public class KafkaSampleProducer { Random rnd = new Random(); Map<String, Object> record = new HashMap(); while (alive == true) { - record.put("order_time", (new Date().getTime() - delay)); + //add normal record + record.put("order_time", (new Date().getTime())); record.put("country", countries.get(rnd.nextInt(countries.size()))); record.put("category", category.get(rnd.nextInt(category.size()))); record.put("device", devices.get(rnd.nextInt(devices.size()))); record.put("qty", rnd.nextInt(10)); record.put("currency", "USD"); record.put("amount", rnd.nextDouble() * 100); - ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); - System.out.println("Sending 1 message"); + //add embedded record + Map<String, Object> user = new HashMap(); + user.put("id", UUID.randomUUID().toString()); + user.put("gender", genders.get(rnd.nextInt(2))); + user.put("age", rnd.nextInt(20) + 10); + record.put("user", user); + //send message + ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record)); + System.out.println("Sending 1 message: " + record.toString()); producer.send(data); Thread.sleep(interval); }