KYLIN-2131, fix class getName()
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/656e9084 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/656e9084 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/656e9084 Branch: refs/heads/KYLIN-2131 Commit: 656e90848d79c4e34000cbb6a41f2c5387cfa76d Parents: ffca41b Author: Billy Liu <billy...@apache.org> Authored: Fri Dec 16 11:53:43 2016 +0800 Committer: Billy Liu <billy...@apache.org> Committed: Sun Dec 18 14:15:13 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/streaming/Kafka10DataLoader.java | 5 +++-- .../main/java/org/apache/kylin/source/kafka/KafkaSource.java | 4 ++-- .../java/org/apache/kylin/source/kafka/util/KafkaClient.java | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/656e9084/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java index c7a487a..fae81ce 100644 --- a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java +++ b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kylin.source.kafka.config.BrokerConfig; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -79,8 +80,8 @@ public class Kafka10DataLoader extends StreamDataLoader { Properties props = new Properties(); props.put("retry.backoff.ms", "1000"); props.put("bootstrap.servers", brokers); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); props.put("acks", "1"); props.put("buffer.memory", 33554432); props.put("retries", 0); http://git-wip-us.apache.org/repos/asf/kylin/blob/656e9084/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 1f3c446..6c1ac1f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -85,8 +85,8 @@ public class KafkaSource implements ISource { logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart()); result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); } else { - // from the topic's very begining; - logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning."); + // from the topic's earliest offset; + logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset."); result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/656e9084/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index f891467..51339c7 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -56,8 +56,8 @@ public class KafkaClient { } } props.put("bootstrap.servers", brokers); - props.put("key.deserializer", StringDeserializer.class.getClass().getCanonicalName()); - props.put("value.deserializer", StringDeserializer.class.getClass().getCanonicalName()); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); props.put("group.id", consumerGroup); props.put("enable.auto.commit", "false"); return props;