KYLIN-1435: Relax the checking for PartitionMetadata and logger the error code
Signed-off-by: honma <ho...@ebay.com> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f93bec1 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f93bec1 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f93bec1 Branch: refs/heads/1.4-rc Commit: 1f93bec1ca14fe0bdc2943fa9000404401f0acf5 Parents: f18f7b0 Author: yangzhong <yangzh...@ebay.com> Authored: Tue Feb 23 17:37:11 2016 +0800 Committer: honma <ho...@ebay.com> Committed: Tue Feb 23 18:22:08 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/source/kafka/KafkaStreamingInput.java | 5 ++++- .../java/org/apache/kylin/source/kafka/util/KafkaUtils.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1f93bec1/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index ee5a555..bcde47b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -123,7 +123,10 @@ public class KafkaStreamingInput implements IStreamingInput { private Broker getLeadBroker() { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, replicaBrokers, kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } replicaBrokers = partitionMetadata.replicas(); return partitionMetadata.leader(); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/1f93bec1/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index ab54abb..f506999 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -51,7 +51,10 @@ public final class KafkaUtils { public static Broker getLeadBroker(KafkaClusterConfig kafkaClusterConfig, int partitionId) { final PartitionMetadata partitionMetadata = KafkaRequester.getPartitionMetadata(kafkaClusterConfig.getTopic(), partitionId, kafkaClusterConfig.getBrokers(), kafkaClusterConfig); - if (partitionMetadata != null && partitionMetadata.errorCode() == 0) { + if (partitionMetadata != null) { + if (partitionMetadata.errorCode() != 0){ + logger.warn("PartitionMetadata errorCode: "+partitionMetadata.errorCode()); + } return partitionMetadata.leader(); } else { return null;