I got solution for this error:

Had to copy hadoop installed directories from hadoop nodes to all the kafka
connect nodes as well.
Export HADOOP_CONF_DIR in .bashrc of kafka connect nodes.
Use hadoop.conf.dir in REST POST payload request.

Thanks
Nishant Verma

Nishant

sent from handheld device. please ignore typos.

On Tue, Jul 4, 2017 at 10:20 PM, Nishant Verma <[email protected]>
wrote:

> Not sure if this is the exact forum for this query.
>
> HDFS is 2.7.3
> Kafka Connect is 3.2.0 and Kafka is 0.10.2.0.
>
> We are using Kafka Connect HDFS Connector to pull records from Kafka topic
> to HDFS.
>
> Till yesyerday, we had just one namenode in our cluster and we were using
> "hdfs.url" as "hdfs://abc.xyz.com:9000" in the REST POST request of Kafka
> Connect HDFS Connector for the connector to start.
> And it used to work. The same URL was defined in core-site.xml at
> fs.defaultFS property.
>
> Now, we have updated our cluster to a active-standby namenode kind of
> cluster and we are using namespace now instead of hdfs node ip and port.
> We have updated our core-site.xml to fs.defaultFS values as
> hdfs://ha-cluster-dev1. We are able to do the active and standby switch of
> namenodes.
> We are also able to list hdfs directories using hadoop fs -ls
> hdfs://ha-cluster-dev1/.
>
> We have updated our REST request to "hdfs.url":"hdfs://cfms-ha-cluster-dev1/"
> and rest other properties in the payload are same.
>
> Now, as soon as we send this REST POST request, all the tasks die
> immediately and hdfs connector does not start. The log says below error:
>
> [2017-07-04 16:57:12,951] INFO Couldn't start HdfsSinkConnector:
> (io.confluent.connect.hdfs.HdfsSinkTask:84)
> org.apache.kafka.connect.errors.ConnectException:
> java.lang.reflect.InvocationTargetException
>         at io.confluent.connect.hdfs.storage.StorageFactory.createStora
> ge(StorageFactory.java:31)
>         at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:
> 168)
>         at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.
> java:76)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAn
> dStart(WorkerSinkTask.java:231)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(
> WorkerSinkTask.java:145)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask
> .java:139)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.
> java:182)
>         at java.util.concurrent.Executors$RunnableAdapter.call(
> Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.reflect.InvocationTargetException
>         at sun.reflect.GeneratedConstructorAccessor22.newInstance(Unknown
> Source)
>         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De
> legatingConstructorAccessorImpl.java:45)
>         at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
>         at io.confluent.connect.hdfs.storage.StorageFactory.createStora
> ge(StorageFactory.java:29)
>         ... 11 more
> Caused by: java.lang.IllegalArgumentException:
> java.net.UnknownHostException: ha-cluster-dev1
>         at org.apache.hadoop.security.SecurityUtil.buildTokenService(Se
> curityUtil.java:378)
>         at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
> NodeProxies.java:310)
>         at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
> roxies.java:176)
>         at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>         at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>         at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
> ributedFileSystem.java:149)
>         at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.
> java:2669)
>         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
>         at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem
> .java:2703)
>         at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.
> java:2691)
>         at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:
> 420)
>         at io.confluent.connect.hdfs.storage.HdfsStorage.<init>(HdfsSto
> rage.java:39)
>         ... 15 more
> Caused by: java.net.UnknownHostException: ha-cluster-dev1
>         ... 27 more
> [2017-07-04 16:57:12,952] INFO Shutting down HdfsSinkConnector.
> (io.confluent.connect.hdfs.HdfsSinkTask:85)
>
> And all the tasks die with error trace as:
>
> java.lang.NullPointerException
> at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:121)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffset
> s(WorkerSinkTask.java:317)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartiti
> ons(WorkerSinkTask.java:480)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(
> WorkerSinkTask.java:152)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
> at java.lang.Thread.run(Thread.java:744)
>
> Clearly, the request is looking for hdfs.url to be a host:port combination
> but my namespace is not a host per se and hence the error.
>
> I am having hdfs cluster namenodes on different machines and kafka connect
> nodes on different machines.
> How am I supposed to run kafka connect (or generically any other external
> system) now using nameservice ID and fix this error?
>
> Thanks
>

Reply via email to