[
https://issues.apache.org/jira/browse/ATLAS-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897998#comment-15897998
]
Greg Senia commented on ATLAS-1647:
-----------------------------------
I think the solution is to possibly wait for Kafka JIRA - [KAFKA-3199]
> AtlasHook does not work with Oozie Sqoop Action or with Original HiveAction
> ---------------------------------------------------------------------------
>
> Key: ATLAS-1647
> URL: https://issues.apache.org/jira/browse/ATLAS-1647
> Project: Atlas
> Issue Type: Bug
> Reporter: Greg Senia
> Fix For: 0.6-incubating, 0.7-incubating, 0.8-incubating,
> 0.7.1-incubating
>
>
> Doing some testing with Atlas 0.7.x AtlasHook does not place messages onto
> the Kafka queues correctly when the SqoopAction or HiveAction executes from
> within a secure Oozie Context because the job is running within the cluster
> and is using Delegation Token's which can be turned back into a UGI context
> by doing a UserGroupInformation.loginUserFromSubject. Problem is Kafka does
> not support UGI or Java Subject...
> AtlasHook class:
> if (!(isLoginKeytabBased())){
> if (isLoginTicketBased()) {
>
> InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient",
> "ticketBased-KafkaClient");
> LOG.info("TicketBased=true Kafka");
> } else {
> LOG.info("TicketBased=false and KeyTabBased=false Kafka");
> AccessControlContext context = AccessController.getContext();
> Subject subject = Subject.getSubject(context);
> if (subject == null) {
> LOG.info("No Subject Available");
> } else {
> try {
>
> UserGroupInformation.loginUserFromSubject(subject);
> Example of log output showing debug from Oozie Sqoop Action:
> 1 [main] INFO org.apache.sqoop.mapreduce.ImportJobBase - Publishing
> Hive/Hcat import job data to Listeners
> 33181 [main] INFO org.apache.sqoop.mapreduce.ImportJobBase - Publishing
> Hive/Hcat import job data to Listeners
> 33196 [main] INFO org.apache.atlas.ApplicationProperties - Looking for
> atlas-application.properties in classpath
> 33196 [main] INFO org.apache.atlas.ApplicationProperties - Loading
> atlas-application.properties from
> file:/gss/hadoop/diska/hadoop/yarn/local/usercache/gss2002/appcache/application_1488823620014_0005/container_e135_1488823620014_0005_01_000002/atlas-application.properties
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties - Configuration
> loaded:
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.authentication.method.kerberos = True
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.cluster.name = tech
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.hook.hive.keepAliveTime = 10
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.hook.hive.maxThreads = 5
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.hook.hive.minThreads = 5
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.hook.hive.numRetries = 3
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.hook.hive.queueSize = 1000
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.hook.hive.synchronous = false
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.loginModuleControlFlag = required
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.loginModuleName =
> com.sun.security.auth.module.Krb5LoginModule
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.option.renewTicket = True
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.option.serviceName = kafka
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.option.storeKey = false
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.option.useKeyTab = false
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.jaas.KafkaClient.option.useTicketCache = True
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.bootstrap.servers = ha21t55mn.tech.hdp.example.com:6667
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.hook.group.id = atlas
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.sasl.kerberos.service.name = kafka
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.security.protocol = PLAINTEXTSASL
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.zookeeper.connect = [ha21t53mn.tech.hdp.example.com:2181,
> ha21t51mn.tech.hdp.example.com:2181, ha21t52mn.tech.hdp.example.com:2181]
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.zookeeper.connection.timeout.ms = 200
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.zookeeper.session.timeout.ms = 400
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.kafka.zookeeper.sync.time.ms = 20
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.notification.create.topics = True
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.notification.replicas = 1
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.notification.topics = [ATLAS_HOOK, ATLAS_ENTITIES]
> 33214 [main] DEBUG org.apache.atlas.ApplicationProperties -
> atlas.rest.address = http://ha21t55mn.tech.hdp.example.com:21000
> 33215 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - ==>
> InMemoryJAASConfiguration.init()
> 33217 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - ==>
> InMemoryJAASConfiguration.init()
> 33220 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - ==>
> InMemoryJAASConfiguration.initialize()
> 33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration -
> Adding client: [KafkaClient{-1}]
> loginModule: [com.sun.security.auth.module.Krb5LoginModule]
> controlFlag: [LoginModuleControlFlag: required]
> Options: [storeKey] => [false]
> Options: [renewTicket] => [True]
> Options: [useKeyTab] => [false]
> Options: [serviceName] => [kafka]
> Options: [useTicketCache] => [True]
> 33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - <==
> InMemoryJAASConfiguration.initialize({KafkaClient=[javax.security.auth.login.AppConfigurationEntry@669c2b07]})
> 33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - <==
> InMemoryJAASConfiguration.init()
> 33223 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - <==
> InMemoryJAASConfiguration.init()
> 33239 [main] INFO org.apache.atlas.hook.AtlasHook - gss TicketBased=false
> and KeyTabBased=false Kafka
> 33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - hadoop
> login
> 33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - hadoop
> login commit
> 33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - using
> existing subject:[gss2002, UnixPrincipal: gss2002, UnixNumericUserPrincipal:
> 190186246, UnixNumericGroupPrincipal [Primary Group]: 190000513,
> UnixNumericGroupPrincipal [Supplementary Group]: 190172138,
> UnixNumericGroupPrincipal [Supplementary Group]: 190172480,
> UnixNumericGroupPrincipal [Supplementary Group]: 190179404,
> UnixNumericGroupPrincipal [Supplementary Group]: 190180058,
> UnixNumericGroupPrincipal [Supplementary Group]: 190180097,
> UnixNumericGroupPrincipal [Supplementary Group]: 190180140,
> UnixNumericGroupPrincipal [Supplementary Group]: 190190874]
> 33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation -
> Assuming keytab is managed externally since logged in from subject.
> 33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - Reading
> credentials from location set in HADOOP_TOKEN_FILE_LOCATION:
> /gss/hadoop/diska/hadoop/yarn/local/usercache/gss2002/appcache/application_1488823620014_0005/container_e135_1488823620014_0005_01_000002/container_tokens
> 33240 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - Loaded
> 6 tokens
> 33241 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - UGI
> loginUser:gss2002 (auth:KERBEROS)
> 33435 [main] INFO org.apache.atlas.hook.AtlasHook - Created Atlas Hook
> 34062 [IPC Client (1267105885) connection to /10.70.41.7:43513 from
> job_1488823620014_0005] DEBUG org.apache.hadoop.security.SaslRpcClient -
> reading next wrapped RPC packet
> 34062 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client -
> IPC Client (1267105885) connection to /10.70.41.7:43513 from
> job_1488823620014_0005 sending #1563
> 34062 [IPC Parameter Sending Thread #0] DEBUG
> org.apache.hadoop.security.SaslRpcClient - wrapping token of length:264
> 34063 [IPC Client (1267105885) connection to /10.70.41.7:43513 from
> job_1488823620014_0005] DEBUG org.apache.hadoop.security.SaslRpcClient -
> unwrapping token of length:62
> 34063 [IPC Client (1267105885) connection to /10.70.41.7:43513 from
> job_1488823620014_0005] DEBUG org.apache.hadoop.ipc.Client - IPC Client
> (1267105885) connection to /10.70.41.7:43513 from job_1488823620014_0005 got
> value #1563
> 34063 [communication thread] DEBUG org.apache.hadoop.ipc.RPC - Call: ping 2
> 34435 [main] INFO org.apache.kafka.clients.producer.ProducerConfig -
> ProducerConfig values:
> metric.reporters = []
> Failure Snippit:
> 36504 [main] ERROR org.apache.atlas.hook.AtlasHook - Failed to notify atlas
> for entity [[{Id='(type: sqoop_dbdatastore, id: <unassigned>)', traits=[],
> values={owner=gss2002, storeUri=jdbc:oracle:thin:
> Excluded secure information
> , storeUse=TABLE}}, name=sqoop
> excluded secure information
> --hive-cluster tech, startTime=Mon Mar 06 14:32:22 EST 2017, endTime=Mon Mar
> 06 14:32:51 EST 2017, userName=gss2002, operation=import}}]] after 3 retries.
> Quitting
> 35491 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor
> with name bufferpool-wait-time
> 35491 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor
> with name buffer-exhausted-records
> 35491 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster
> metadata version 1 to Cluster(nodes = [ha21t55mn.tech.hdp.example.com:6667
> (id: -1 rack: null)], partitions = [])
> 35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - ==>
> InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient)
> 35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - <==
> InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient):
> {javax.security.auth.login.AppConfigurationEntry@669c2b07}
> 35491 [main] DEBUG
> org.apache.kafka.common.security.authenticator.AbstractLogin - System
> property 'java.security.auth.login.config' is not set, using default JAAS
> configuration.
> 35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - ==>
> InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient)
> 35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - <==
> InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient):
> {javax.security.auth.login.AppConfigurationEntry@669c2b07}
> 35491 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - ==>
> InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient)
> 35492 [main] DEBUG org.apache.atlas.security.InMemoryJAASConfiguration - <==
> InMemoryJAASConfiguration.getAppConfigurationEntry(KafkaClient):
> {javax.security.auth.login.AppConfigurationEntry@669c2b07}
> 35492 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing
> the Kafka producer with timeoutMillis = 0 ms.
> 35492 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - The
> Kafka producer has closed.
> 35492 [main] ERROR org.apache.atlas.hook.AtlasHook - Failed to send
> notification - attempt #2; error=Failed to construct kafka producer
> 35492 [main] DEBUG org.apache.atlas.hook.AtlasHook - Sleeping for 1000 ms
> before retry
> 36501 [main] INFO org.apache.kafka.clients.producer.ProducerConfig -
> ProducerConfig values:
> metric.reporters = []
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
> at
> org.apache.atlas.kafka.KafkaNotification.createProducer(KafkaNotification.java:311)
> at
> org.apache.atlas.kafka.KafkaNotification.sendInternal(KafkaNotification.java:220)
> at
> org.apache.atlas.notification.AbstractNotification.send(AbstractNotification.java:84)
> at
> org.apache.atlas.hook.AtlasHook.notifyEntitiesInternal(AtlasHook.java:158)
> at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:143)
> at org.apache.atlas.sqoop.hook.SqoopHook.publish(SqoopHook.java:177)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)