KYLIN-2131, fix class getName()

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/656e9084
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/656e9084
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/656e9084

Branch: refs/heads/KYLIN-2131
Commit: 656e90848d79c4e34000cbb6a41f2c5387cfa76d
Parents: ffca41b
Author: Billy Liu <billy...@apache.org>
Authored: Fri Dec 16 11:53:43 2016 +0800
Committer: Billy Liu <billy...@apache.org>
Committed: Sun Dec 18 14:15:13 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/streaming/Kafka10DataLoader.java  | 5 +++--
 .../main/java/org/apache/kylin/source/kafka/KafkaSource.java    | 4 ++--
 .../java/org/apache/kylin/source/kafka/util/KafkaClient.java    | 4 ++--
 3 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/656e9084/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java 
b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
index c7a487a..fae81ce 100644
--- 
a/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
+++ 
b/assembly/src/test/java/org/apache/kylin/job/streaming/Kafka10DataLoader.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kylin.source.kafka.config.BrokerConfig;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -79,8 +80,8 @@ public class Kafka10DataLoader extends StreamDataLoader {
         Properties props = new Properties();
         props.put("retry.backoff.ms", "1000");
         props.put("bootstrap.servers", brokers);
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
         props.put("acks", "1");
         props.put("buffer.memory", 33554432);
         props.put("retries", 0);

http://git-wip-us.apache.org/repos/asf/kylin/blob/656e9084/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
index 1f3c446..6c1ac1f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java
@@ -85,8 +85,8 @@ public class KafkaSource implements ISource {
                 logger.debug("Last segment doesn't exist, use the start offset 
that be initiated previously: " + 
cube.getDescriptor().getPartitionOffsetStart());
                 
result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart());
             } else {
-                // from the topic's very begining;
-                logger.debug("Last segment doesn't exist, and didn't initiate 
the start offset, will seek from topic's very beginning.");
+                // from the topic's earliest offset;
+                logger.debug("Last segment doesn't exist, and didn't initiate 
the start offset, will seek from topic's earliest offset.");
                 
result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube));
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/656e9084/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
index f891467..51339c7 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java
@@ -56,8 +56,8 @@ public class KafkaClient {
             }
         }
         props.put("bootstrap.servers", brokers);
-        props.put("key.deserializer", 
StringDeserializer.class.getClass().getCanonicalName());
-        props.put("value.deserializer", 
StringDeserializer.class.getClass().getCanonicalName());
+        props.put("key.deserializer", StringDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
         props.put("group.id", consumerGroup);
         props.put("enable.auto.commit", "false");
         return props;

Reply via email to