http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
index f85505d..150f0d3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandler.java
@@ -24,8 +24,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
 import org.apache.accumulo.server.security.UserImpersonation;
@@ -81,6 +83,19 @@ public class TCredentialsUpdatingInvocationHandler<I> 
implements InvocationHandl
     }
 
     Class<? extends AuthenticationToken> tokenClass = 
getTokenClassFromName(tcreds.tokenClassName);
+
+    // The Accumulo principal extracted from the SASL transport
+    final String principal = UGIAssumingProcessor.rpcPrincipal();
+
+    // If we authenticated the user over DIGEST-MD5 and they have a 
DelegationToken, the principals should match
+    if (SaslMechanism.DIGEST_MD5 == UGIAssumingProcessor.rpcMechanism() && 
DelegationToken.class.isAssignableFrom(tokenClass)) {
+      if (!principal.equals(tcreds.principal)) {
+        log.warn("{} issued RPC with delegation token over DIGEST-MD5 as the 
Accumulo principal {}. Disallowing RPC", principal, tcreds.principal);
+        throw new ThriftSecurityException("RPC principal did not match 
provided Accumulo principal", SecurityErrorCode.BAD_CREDENTIALS);
+      }
+      return;
+    }
+
     // If the authentication token isn't a KerberosToken
     if (!KerberosToken.class.isAssignableFrom(tokenClass) && 
!SystemToken.class.isAssignableFrom(tokenClass)) {
       // Don't include messages about SystemToken since it's internal
@@ -88,9 +103,6 @@ public class TCredentialsUpdatingInvocationHandler<I> 
implements InvocationHandl
       throw new ThriftSecurityException("Did not receive a valid token", 
SecurityErrorCode.BAD_CREDENTIALS);
     }
 
-    // The Accumulo principal extracted from the SASL transport
-    final String principal = UGIAssumingProcessor.rpcPrincipal();
-
     if (null == principal) {
       log.debug("Found KerberosToken in TCredentials, but did not receive 
principal from SASL processor");
       throw new ThriftSecurityException("Did not extract principal from Thrift 
SASL processor", SecurityErrorCode.BAD_CREDENTIALS);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index f1f8963..558b02e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -34,7 +34,6 @@ import javax.net.ssl.SSLServerSocket;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
@@ -150,7 +149,7 @@ public class TServerUtils {
         try {
           HostAndPort addr = HostAndPort.fromParts(hostname, port);
           return TServerUtils.startTServer(addr, serverType, timedProcessor, 
serverName, threadName, minThreads, simpleTimerThreadpoolSize,
-              timeBetweenThreadChecks, maxMessageSize, 
service.getServerSslParams(), service.getServerSaslParams(), 
service.getClientTimeoutInMillis());
+              timeBetweenThreadChecks, maxMessageSize, 
service.getServerSslParams(), service.getSaslParams(), 
service.getClientTimeoutInMillis());
         } catch (TTransportException ex) {
           log.error("Unable to start TServer", ex);
           if (ex.getCause() == null || ex.getCause().getClass() == 
BindException.class) {
@@ -380,7 +379,7 @@ public class TServerUtils {
   }
 
   public static ServerAddress createSaslThreadPoolServer(HostAndPort address, 
TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout,
-      SaslConnectionParams params, final String serverName, String threadName, 
final int numThreads, final int numSTThreads, long timeBetweenThreadChecks)
+      SaslServerConnectionParams params, final String serverName, String 
threadName, final int numThreads, final int numSTThreads, long 
timeBetweenThreadChecks)
       throws TTransportException {
     // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 
Thread that the TThreadPoolServer does,
     // but sadly this isn't the case. Because TSaslTransport needs to issue a 
handshake when it open()'s which will fail
@@ -388,7 +387,7 @@ public class TServerUtils {
     log.info("Creating SASL thread pool thrift server on listening on {}:{}", 
address.getHostText(), address.getPort());
     TServerSocket transport = new TServerSocket(address.getPort(), (int) 
socketTimeout);
 
-    final String hostname, fqdn;
+    String hostname, fqdn;
     try {
       hostname = 
InetAddress.getByName(address.getHostText()).getCanonicalHostName();
       fqdn = InetAddress.getLocalHost().getCanonicalHostName();
@@ -396,10 +395,15 @@ public class TServerUtils {
       throw new TTransportException(e);
     }
 
+    // If we can't get a real hostname from the provided host test, use the 
hostname from DNS for localhost
+    if ("0.0.0.0".equals(hostname)) {
+      hostname = fqdn;
+    }
+
     // ACCUMULO-3497 an easy sanity check we can perform for the user when 
SASL is enabled. Clients and servers have to agree upon the FQDN
     // so that the SASL handshake can occur. If the provided hostname doesn't 
match the FQDN for this host, fail quickly and inform them to update
     // their configuration.
-    if (!"0.0.0.0".equals(hostname) && !hostname.equals(fqdn)) {
+    if (!hostname.equals(fqdn)) {
       log.error(
           "Expected hostname of '{}' but got '{}'. Ensure the entries in the 
Accumulo hosts files (e.g. masters, slaves) are the FQDN for each host when 
using SASL.",
           fqdn, hostname);
@@ -413,7 +417,7 @@ public class TServerUtils {
       throw new TTransportException(e);
     }
 
-    log.trace("Logged in as {}, creating TSsaslServerTransport factory as 
{}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
+    log.debug("Logged in as {}, creating TSaslServerTransport factory with 
{}/{}", serverUser, params.getKerberosServerPrimary(), hostname);
 
     // Make the SASL transport factory with the instance and primary from the 
kerberos server principal, SASL properties
     // and the SASL callback handler from Hadoop to ensure authorization ID is 
the authentication ID. Despite the 'protocol' argument seeming to be useless, it
@@ -422,6 +426,14 @@ public class TServerUtils {
     saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, 
params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
         new SaslRpcServer.SaslGssCallbackHandler());
 
+    if (null != params.getSecretManager()) {
+      log.info("Adding DIGEST-MD5 server definition for delegation tokens");
+      saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, 
params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+          new SaslServerDigestCallbackHandler(params.getSecretManager()));
+    } else {
+      log.info("SecretManager is null, not adding support for delegation token 
authentication");
+    }
+
     // Make sure the TTransportFactory is performing a UGI.doAs
     TTransportFactory ugiTransportFactory = new 
UGIAssumingTransportFactory(saslTransportFactory, serverUser);
 
@@ -440,7 +452,7 @@ public class TServerUtils {
 
   public static ServerAddress startTServer(AccumuloConfiguration conf, 
HostAndPort address, ThriftServerType serverType, TProcessor processor,
       String serverName, String threadName, int numThreads, int numSTThreads, 
long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams 
sslParams,
-      SaslConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
+      SaslServerConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
 
     if (ThriftServerType.SASL == serverType) {
       processor = updateSaslProcessor(serverType, processor);
@@ -452,11 +464,11 @@ public class TServerUtils {
 
   /**
    * @see #startTServer(HostAndPort, ThriftServerType, TimedProcessor, 
TProtocolFactory, String, String, int, int, long, long, SslConnectionParams,
-   *      SaslConnectionParams, long)
+   *      org.apache.accumulo.core.rpc.SaslConnectionParams, long)
    */
   public static ServerAddress startTServer(HostAndPort address, 
ThriftServerType serverType, TimedProcessor processor, String serverName, 
String threadName,
-      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long 
maxMessageSize, SslConnectionParams sslParams, SaslConnectionParams saslParams,
-      long serverSocketTimeout) throws TTransportException {
+      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long 
maxMessageSize, SslConnectionParams sslParams,
+      SaslServerConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
     return startTServer(address, serverType, processor, 
ThriftUtil.protocolFactory(), serverName, threadName, numThreads, numSTThreads,
         timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, 
serverSocketTimeout);
   }
@@ -468,7 +480,7 @@ public class TServerUtils {
    */
   public static ServerAddress startTServer(HostAndPort address, 
ThriftServerType serverType, TimedProcessor processor, TProtocolFactory 
protocolFactory,
       String serverName, String threadName, int numThreads, int numSTThreads, 
long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams 
sslParams,
-      SaslConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
+      SaslServerConnectionParams saslParams, long serverSocketTimeout) throws 
TTransportException {
 
     // This is presently not supported. It's hypothetically possible, I 
believe, to work, but it would require changes in how the transports
     // work at the Thrift layer to ensure that both the SSL and SASL 
handshakes function. SASL's quality of protection addresses privacy issues.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
index ab106a6..48d18f4 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/UGIAssumingProcessor.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import javax.security.sasl.SaslServer;
 
+import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
@@ -40,6 +41,8 @@ public class UGIAssumingProcessor implements TProcessor {
   private static final Logger log = 
LoggerFactory.getLogger(UGIAssumingProcessor.class);
 
   public static final ThreadLocal<String> rpcPrincipal = new 
ThreadLocal<String>();
+  public static final ThreadLocal<SaslMechanism> rpcMechanism = new 
ThreadLocal<SaslMechanism>();
+
   private final TProcessor wrapped;
   private final UserGroupInformation loginUser;
 
@@ -60,6 +63,14 @@ public class UGIAssumingProcessor implements TProcessor {
     return rpcPrincipal.get();
   }
 
+  public static ThreadLocal<String> getRpcPrincipalThreadLocal() {
+    return rpcPrincipal;
+  }
+
+  public static SaslMechanism rpcMechanism() {
+    return rpcMechanism.get();
+  }
+
   @Override
   public boolean process(final TProtocol inProt, final TProtocol outProt) 
throws TException {
     TTransport trans = inProt.getTransport();
@@ -71,20 +82,42 @@ public class UGIAssumingProcessor implements TProcessor {
     String authId = saslServer.getAuthorizationID();
     String endUser = authId;
 
-    log.trace("Received SASL RPC from {}", endUser);
+    SaslMechanism mechanism;
+    try {
+      mechanism = SaslMechanism.get(saslServer.getMechanismName());
+    } catch (Exception e) {
+      log.error("Failed to process RPC with SASL mechanism {}", 
saslServer.getMechanismName());
+      throw e;
+    }
 
-    UserGroupInformation clientUgi = 
UserGroupInformation.createProxyUser(endUser, loginUser);
-    final String remoteUser = clientUgi.getUserName();
+    switch (mechanism) {
+      case GSSAPI:
+        UserGroupInformation clientUgi = 
UserGroupInformation.createProxyUser(endUser, loginUser);
+        final String remoteUser = clientUgi.getUserName();
 
-    try {
-      // Set the principal in the ThreadLocal for access to get authorizations
-      rpcPrincipal.set(remoteUser);
+        try {
+          // Set the principal in the ThreadLocal for access to get 
authorizations
+          rpcPrincipal.set(remoteUser);
 
-      return wrapped.process(inProt, outProt);
-    } finally {
-      // Unset the principal after we're done using it just to be sure that 
it's not incorrectly
-      // used in the same thread down the line.
-      rpcPrincipal.set(null);
+          return wrapped.process(inProt, outProt);
+        } finally {
+          // Unset the principal after we're done using it just to be sure 
that it's not incorrectly
+          // used in the same thread down the line.
+          rpcPrincipal.set(null);
+        }
+      case DIGEST_MD5:
+        // The CallbackHandler, after deserializing the TokenIdentifier in the 
name, has already updated
+        // the rpcPrincipal for us. We don't need to do it again here.
+        try {
+          rpcMechanism.set(mechanism);
+          return wrapped.process(inProt, outProt);
+        } finally {
+          // Unset the mechanism after we're done using it just to be sure 
that it's not incorrectly
+          // used in the same thread down the line.
+          rpcMechanism.set(null);
+        }
+      default:
+        throw new IllegalArgumentException("Cannot process SASL mechanism " + 
mechanism);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index cc7a7cd..283cba3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -454,4 +454,18 @@ public class AuditedSecurityOperation extends 
SecurityOperation {
       throw e;
     }
   }
+
+  public static final String DELEGATION_TOKEN_AUDIT_TEMPLATE = "requested 
delegation token";
+
+  @Override
+  public boolean canObtainDelegationToken(TCredentials credentials) throws 
ThriftSecurityException {
+    try {
+      boolean result = super.canObtainDelegationToken(credentials);
+      audit(credentials, result, DELEGATION_TOKEN_AUDIT_TEMPLATE);
+      return result;
+    } catch (ThriftSecurityException e) {
+      audit(credentials, false, DELEGATION_TOKEN_AUDIT_TEMPLATE);
+      throw e;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index 7adb46e..0b0f212 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -830,4 +830,8 @@ public class SecurityOperation {
     return hasSystemPermissionWithNamespaceId(credentials, 
SystemPermission.ALTER_NAMESPACE, namespaceId, false);
   }
 
+  public boolean canObtainDelegationToken(TCredentials credentials) throws 
ThriftSecurityException {
+    authenticate(credentials);
+    return hasSystemPermission(credentials, 
SystemPermission.OBTAIN_DELEGATION_TOKEN, false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
index 51d50a1..6a915c6 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/SystemCredentials.java
@@ -33,7 +33,6 @@ import 
org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.Base64;
@@ -69,8 +68,7 @@ public final class SystemCredentials extends Credentials {
     check_permission();
     String principal = SYSTEM_PRINCIPAL;
     AccumuloConfiguration conf = SiteConfiguration.getInstance();
-    SaslConnectionParams saslParams = SaslConnectionParams.forConfig(conf);
-    if (null != saslParams) {
+    if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
       // Use the server's kerberos principal as the Accumulo principal. We 
could also unwrap the principal server-side, but the principal for 
SystemCredentials
       // isnt' actually used anywhere, so it really doesn't matter. We can't 
include the kerberos principal in the SystemToken as it would break equality 
when
       // different Accumulo servers are using different kerberos principals 
are their accumulo principal

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java
new file mode 100644
index 0000000..134502a
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationKey.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server.security.delegation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.crypto.SecretKey;
+
+import org.apache.accumulo.core.security.thrift.TAuthenticationKey;
+import org.apache.accumulo.core.util.ThriftMessageUtil;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Represents a secret key used for signing and verifying authentication 
tokens by {@link AuthenticationTokenSecretManager}.
+ */
+public class AuthenticationKey implements Writable {
+  private TAuthenticationKey authKey;
+  private SecretKey secret;
+
+  public AuthenticationKey() {
+    // for Writable
+  }
+
+  public AuthenticationKey(int keyId, long creationDate, long expirationDate, 
SecretKey key) {
+    checkNotNull(key);
+    authKey = new TAuthenticationKey(ByteBuffer.wrap(key.getEncoded()));
+    authKey.setCreationDate(creationDate);
+    authKey.setKeyId(keyId);
+    authKey.setExpirationDate(expirationDate);
+    this.secret = key;
+  }
+
+  public int getKeyId() {
+    checkNotNull(authKey);
+    return authKey.getKeyId();
+  }
+
+  public long getCreationDate() {
+    checkNotNull(authKey);
+    return authKey.getCreationDate();
+  }
+
+  public void setCreationDate(long creationDate) {
+    checkNotNull(authKey);
+    authKey.setCreationDate(creationDate);
+  }
+
+  public long getExpirationDate() {
+    checkNotNull(authKey);
+    return authKey.getExpirationDate();
+  }
+
+  public void setExpirationDate(long expirationDate) {
+    checkNotNull(authKey);
+    authKey.setExpirationDate(expirationDate);
+  }
+
+  SecretKey getKey() {
+    return secret;
+  }
+
+  void setKey(SecretKey secret) {
+    this.secret = secret;
+  }
+
+  @Override
+  public int hashCode() {
+    if (null == authKey) {
+      return 1;
+    }
+    HashCodeBuilder hcb = new HashCodeBuilder(29, 31);
+    
hcb.append(authKey.getKeyId()).append(authKey.getExpirationDate()).append(authKey.getCreationDate()).append(secret.getEncoded());
+    return hcb.toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof AuthenticationKey)) {
+      return false;
+    }
+    AuthenticationKey other = (AuthenticationKey) obj;
+    // authKey might be null due to writable nature
+    if (null == authKey && null != other.authKey) {
+      return false;
+    }
+    return authKey.equals(other.authKey);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("AuthenticationKey[");
+    if (null == authKey) {
+      buf.append("null]");
+    } else {
+      buf.append("id=").append(authKey.getKeyId()).append(", 
expiration=").append(authKey.getExpirationDate()).append(", creation=")
+          .append(authKey.getCreationDate()).append("]");
+    }
+    return buf.toString();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (null == authKey) {
+      WritableUtils.writeVInt(out, 0);
+      return;
+    }
+    ThriftMessageUtil util = new ThriftMessageUtil();
+    ByteBuffer serialized = util.serialize(authKey);
+    WritableUtils.writeVInt(out, serialized.limit() - 
serialized.arrayOffset());
+    out.write(serialized.array(), serialized.arrayOffset(), 
serialized.limit());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int length = WritableUtils.readVInt(in);
+    if (0 == length) {
+      return;
+    }
+
+    ThriftMessageUtil util = new ThriftMessageUtil();
+    byte[] bytes = new byte[length];
+    in.readFully(bytes);
+    authKey = util.deserialize(bytes, new TAuthenticationKey());
+    secret = 
AuthenticationTokenSecretManager.createSecretKey(authKey.getSecret());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java
new file mode 100644
index 0000000..3582cfd
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.delegation;
+
+import java.util.List;
+
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Service that handles generation of the secret key used to create delegation 
tokens.
+ */
+public class AuthenticationTokenKeyManager extends Daemon {
+  private static final Logger log = 
LoggerFactory.getLogger(AuthenticationTokenKeyManager.class);
+
+  private final AuthenticationTokenSecretManager secretManager;
+  private final ZooAuthenticationKeyDistributor keyDistributor;
+
+  private long lastKeyUpdate = 0;
+  private long keyUpdateInterval;
+  private long tokenMaxLifetime;
+  private int idSeq = 0;
+  private volatile boolean keepRunning = true, initialized = false;
+
+  /**
+   * Construct the key manager which will generate new AuthenticationKeys to 
generate and verify delegation tokens
+   *
+   * @param mgr
+   *          The SecretManager in use
+   * @param dist
+   *          The implementation to distribute AuthenticationKeys to ZooKeeper
+   * @param keyUpdateInterval
+   *          The frequency, in milliseconds, that new AuthenticationKeys are 
created
+   * @param tokenMaxLifetime
+   *          The lifetime, in milliseconds, of generated AuthenticationKeys 
(and subsequently delegation tokens).
+   */
+  public AuthenticationTokenKeyManager(AuthenticationTokenSecretManager mgr, 
ZooAuthenticationKeyDistributor dist, long keyUpdateInterval,
+      long tokenMaxLifetime) {
+    super("Delegation Token Key Manager");
+    this.secretManager = mgr;
+    this.keyDistributor = dist;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenMaxLifetime = tokenMaxLifetime;
+  }
+
+  @VisibleForTesting
+  void setKeepRunning(boolean keepRunning) {
+    this.keepRunning = keepRunning;
+  }
+
+  public boolean isInitialized() {
+    return initialized;
+  }
+
+  public void gracefulStop() {
+    keepRunning = false;
+  }
+
+  @Override
+  public void run() {
+    // Make sure to initialize the secret manager with keys already in ZK
+    updateStateFromCurrentKeys();
+    initialized = true;
+
+    while (keepRunning) {
+      long now = System.currentTimeMillis();
+
+      _run(now);
+
+      try {
+        Thread.sleep(5000);
+      } catch (InterruptedException ie) {
+        log.debug("Interrupted waiting for next update", ie);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void updateStateFromCurrentKeys() {
+    try {
+      List<AuthenticationKey> currentKeys = keyDistributor.getCurrentKeys();
+      if (!currentKeys.isEmpty()) {
+        for (AuthenticationKey key : currentKeys) {
+          // Ensure that we don't create new Keys with duplicate keyIds for 
keys that already exist
+          // It's not a big concern if we happen to duplicate keyIds for 
already expired keys.
+          if (key.getKeyId() > idSeq) {
+            idSeq = key.getKeyId();
+          }
+          secretManager.addKey(key);
+        }
+        log.info("Added {} existing AuthenticationKeys into the local cache 
from ZooKeeper", currentKeys.size());
+
+        // Try to use the last key instead of creating a new one right away. 
This will present more expected
+        // functionality if the active master happens to die for some reasonn
+        AuthenticationKey currentKey = secretManager.getCurrentKey();
+        if (null != currentKey) {
+          log.info("Updating last key update to {} from current secret manager 
key", currentKey.getCreationDate());
+          lastKeyUpdate = currentKey.getCreationDate();
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Failed to fetch existing AuthenticationKeys from ZooKeeper");
+    }
+  }
+
+  @VisibleForTesting
+  long getLastKeyUpdate() {
+    return lastKeyUpdate;
+  }
+
+  @VisibleForTesting
+  int getIdSeq() {
+    return idSeq;
+  }
+
+  /**
+   * Internal "run" method which performs the actual work.
+   *
+   * @param now
+   *          The current time in millis since epoch.
+   */
+  void _run(long now) {
+    // clear any expired keys
+    int removedKeys = secretManager.removeExpiredKeys(keyDistributor);
+    if (removedKeys > 0) {
+      log.debug("Removed {} expired keys from the local cache", removedKeys);
+    }
+
+    if (lastKeyUpdate + keyUpdateInterval < now) {
+      log.debug("Key update interval passed, creating new authentication key");
+
+      // Increment the idSeq and use the new value as the unique ID
+      AuthenticationKey newKey = new AuthenticationKey(++idSeq, now, now + 
tokenMaxLifetime, secretManager.generateSecret());
+
+      log.debug("Created new {}", newKey.toString());
+
+      // Will set to be the current key given the idSeq
+      secretManager.addKey(newKey);
+
+      // advertise it to tabletservers
+      try {
+        keyDistributor.advertise(newKey);
+      } catch (KeeperException | InterruptedException e) {
+        log.error("Failed to advertise AuthenticationKey in ZooKeeper. 
Exiting.", e);
+        throw new RuntimeException(e);
+      }
+
+      lastKeyUpdate = now;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java
new file mode 100644
index 0000000..99173d2
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenSecretManager.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.server.security.delegation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
+/**
+ * Manages an internal list of secret keys used to sign new authentication 
tokens as they are generated, and to validate existing tokens used for
+ * authentication.
+ *
+ * Each TabletServer, in addition to the Master, has an instance of this 
{@link SecretManager} so that each can authenticate requests from clients 
presenting
+ * delegation tokens. The Master will also run an instance of {@link 
AuthenticationTokenKeyManager} which handles generation of new keys and removal 
of old
+ * keys. That class will call the methods here to ensure the in-memory cache 
is consistent with what is advertised in ZooKeeper.
+ */
+public class AuthenticationTokenSecretManager extends 
SecretManager<AuthenticationTokenIdentifier> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AuthenticationTokenSecretManager.class);
+
+  private final Instance instance;
+  private final long tokenMaxLifetime;
+  private final ConcurrentHashMap<Integer,AuthenticationKey> allKeys = new 
ConcurrentHashMap<Integer,AuthenticationKey>();
+  private AuthenticationKey currentKey;
+
+  /**
+   * Create a new secret manager instance for generating keys.
+   *
+   * @param instance
+   *          Accumulo instance
+   * @param tokenMaxLifetime
+   *          Maximum age (in milliseconds) before a token expires and is no 
longer valid
+   */
+  public AuthenticationTokenSecretManager(Instance instance, long 
tokenMaxLifetime) {
+    checkNotNull(instance);
+    checkArgument(tokenMaxLifetime > 0, "Max lifetime must be positive");
+    this.instance = instance;
+    this.tokenMaxLifetime = tokenMaxLifetime;
+  }
+
+  @Override
+  protected byte[] createPassword(AuthenticationTokenIdentifier identifier) {
+    DelegationTokenConfig cfg = identifier.getConfig();
+
+    long now = System.currentTimeMillis();
+    final AuthenticationKey secretKey = currentKey;
+    identifier.setKeyId(secretKey.getKeyId());
+    identifier.setIssueDate(now);
+    long expiration = now + tokenMaxLifetime;
+    // Catch overflow
+    if (expiration < now) {
+      expiration = Long.MAX_VALUE;
+    }
+    identifier.setExpirationDate(expiration);
+
+    // Limit the lifetime if the user requests it
+    if (null != cfg) {
+      long requestedLifetime = cfg.getTokenLifetime(TimeUnit.MILLISECONDS);
+      if (0 < requestedLifetime) {
+        long requestedExpirationDate = identifier.getIssueDate() + 
requestedLifetime;
+        // Catch overflow again
+        if (requestedExpirationDate < identifier.getIssueDate()) {
+          requestedExpirationDate = Long.MAX_VALUE;
+        }
+        // Ensure that the user doesn't try to extend the expiration date -- 
they may only limit it
+        if (requestedExpirationDate > identifier.getExpirationDate()) {
+          throw new RuntimeException("Requested token lifetime exceeds 
configured maximum");
+        }
+        log.trace("Overriding token expiration date from {} to {}", 
identifier.getExpirationDate(), requestedExpirationDate);
+        identifier.setExpirationDate(requestedExpirationDate);
+      }
+    }
+
+    identifier.setInstanceId(instance.getInstanceID());
+    return createPassword(identifier.getBytes(), secretKey.getKey());
+  }
+
+  @Override
+  public byte[] retrievePassword(AuthenticationTokenIdentifier identifier) 
throws InvalidToken {
+    long now = System.currentTimeMillis();
+    if (identifier.getExpirationDate() < now) {
+      throw new InvalidToken("Token has expired");
+    }
+    if (identifier.getIssueDate() > now) {
+      throw new InvalidToken("Token issued in the future");
+    }
+    AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
+    if (masterKey == null) {
+      throw new InvalidToken("Unknown master key for token (id=" + 
identifier.getKeyId() + ")");
+    }
+    // regenerate the password
+    return createPassword(identifier.getBytes(), masterKey.getKey());
+  }
+
+  @Override
+  public AuthenticationTokenIdentifier createIdentifier() {
+    // Return our TokenIdentifier implementation
+    return new AuthenticationTokenIdentifier();
+  }
+
+  /**
+   * Generates a delegation token for the user with the provided {@code 
username}.
+   *
+   * @param username
+   *          The client to generate the delegation token for.
+   * @param cfg
+   *          A configuration object for obtaining the delegation token
+   * @return A delegation token for {@code username} created using the {@link 
#currentKey}.
+   */
+  public 
Entry<Token<AuthenticationTokenIdentifier>,AuthenticationTokenIdentifier> 
generateToken(String username, DelegationTokenConfig cfg)
+      throws AccumuloException {
+    checkNotNull(username);
+    checkNotNull(cfg);
+
+    final AuthenticationTokenIdentifier id = new 
AuthenticationTokenIdentifier(username, cfg);
+
+    final StringBuilder svcName = new 
StringBuilder(DelegationToken.SERVICE_NAME);
+    if (null != id.getInstanceId()) {
+      svcName.append("-").append(id.getInstanceId());
+    }
+    // Create password will update the state on the identifier given 
currentKey. Need to call this before serializing the identifier
+    byte[] password;
+    try {
+      password = createPassword(id);
+    } catch (RuntimeException e) {
+      throw new AccumuloException(e.getMessage());
+    }
+    // The use of the ServiceLoader inside Token doesn't work to automatically 
get the Identifier
+    // Explicitly returning the identifier also saves an extra deserialization
+    Token<AuthenticationTokenIdentifier> token = new 
Token<AuthenticationTokenIdentifier>(id.getBytes(), password, id.getKind(), new 
Text(svcName.toString()));
+    return Maps.immutableEntry(token, id);
+  }
+
+  /**
+   * Add the provided {@code key} to the in-memory copy of all {@link 
AuthenticationKey}s.
+   *
+   * @param key
+   *          The key to add.
+   */
+  public synchronized void addKey(AuthenticationKey key) {
+    checkNotNull(key);
+
+    log.debug("Adding AuthenticationKey with keyId {}", key.getKeyId());
+
+    allKeys.put(key.getKeyId(), key);
+    if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
+      currentKey = key;
+    }
+  }
+
+  /**
+   * Removes the {@link AuthenticationKey} from the local cache of keys using 
the provided {@link keyId}.
+   *
+   * @param keyId
+   *          The unique ID for the {@link AuthenticationKey} to remove.
+   * @return True if the key was removed, otherwise false.
+   */
+  synchronized boolean removeKey(Integer keyId) {
+    checkNotNull(keyId);
+
+    log.debug("Removing AuthenticatioKey with keyId {}", keyId);
+
+    return null != allKeys.remove(keyId);
+  }
+
+  /**
+   * The current {@link AuthenticationKey}, may be null.
+   *
+   * @return The current key, or null.
+   */
+  @VisibleForTesting
+  AuthenticationKey getCurrentKey() {
+    return currentKey;
+  }
+
+  @VisibleForTesting
+  Map<Integer,AuthenticationKey> getKeys() {
+    return allKeys;
+  }
+
+  /**
+   * Inspect each key cached in {@link #allKeys} and remove it if the 
expiration date has passed. For each removed local {@link AuthenticationKey}, 
the key is
+   * also removed from ZooKeeper using the provided {@code keyDistributor} 
instance.
+   *
+   * @param keyDistributor
+   *          ZooKeeper key distribution class
+   */
+  synchronized int removeExpiredKeys(ZooAuthenticationKeyDistributor 
keyDistributor) {
+    long now = System.currentTimeMillis();
+    int keysRemoved = 0;
+    Iterator<Entry<Integer,AuthenticationKey>> iter = 
allKeys.entrySet().iterator();
+    while (iter.hasNext()) {
+      Entry<Integer,AuthenticationKey> entry = iter.next();
+      AuthenticationKey key = entry.getValue();
+      if (key.getExpirationDate() < now) {
+        log.debug("Removing expired delegation token key {}", key.getKeyId());
+        iter.remove();
+        keysRemoved++;
+        try {
+          keyDistributor.remove(key);
+        } catch (KeeperException | InterruptedException e) {
+          log.error("Failed to remove AuthenticationKey from ZooKeeper. 
Exiting", e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    return keysRemoved;
+  }
+
+  synchronized boolean isCurrentKeySet() {
+    return null != currentKey;
+  }
+
+  /**
+   * Atomic operation to remove all AuthenticationKeys
+   */
+  public synchronized void removeAllKeys() {
+    allKeys.clear();
+    currentKey = null;
+  }
+
+  @Override
+  protected SecretKey generateSecret() {
+    // Method in the parent is a different package, provide the explicit 
override so we can use it directly in our package.
+    return super.generateSecret();
+  }
+
+  public static SecretKey createSecretKey(byte[] raw) {
+    return SecretManager.createSecretKey(raw);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java
new file mode 100644
index 0000000..515b036
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyDistributor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.delegation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class that manages distribution of {@link AuthenticationKey}s, Accumulo's 
secret in the delegation token model, to other Accumulo nodes via ZooKeeper.
+ */
+public class ZooAuthenticationKeyDistributor {
+  private static final Logger log = 
LoggerFactory.getLogger(ZooAuthenticationKeyDistributor.class);
+
+  private final ZooReaderWriter zk;
+  private final String baseNode;
+  private boolean initialized = false;
+
+  public ZooAuthenticationKeyDistributor(ZooReaderWriter zk, String baseNode) {
+    checkNotNull(zk);
+    checkNotNull(baseNode);
+    this.zk = zk;
+    this.baseNode = baseNode;
+  }
+
+  /**
+   * Ensures that ZooKeeper is in a correct state to perform distribution of 
{@link AuthenticationKey}s.
+   */
+  public synchronized void initialize() throws KeeperException, 
InterruptedException {
+    if (initialized) {
+      return;
+    }
+
+    if (!zk.exists(baseNode)) {
+      if (!zk.putPrivatePersistentData(baseNode, new byte[0], 
NodeExistsPolicy.FAIL)) {
+        throw new AssertionError("Got false from putPrivatePersistentData 
method");
+      }
+    } else {
+      List<ACL> acls = zk.getACL(baseNode, new Stat());
+      if (1 == acls.size()) {
+        ACL actualAcl = acls.get(0), expectedAcl = ZooUtil.PRIVATE.get(0);
+        Id actualId = actualAcl.getId();
+        // The expected outcome from ZooUtil.PRIVATE
+        if (actualAcl.getPerms() == expectedAcl.getPerms() && 
actualId.getScheme().equals("digest") && 
actualId.getId().startsWith("accumulo:")) {
+          initialized = true;
+          return;
+        }
+      } else {
+        log.error("Saw more than one ACL on the node");
+      }
+
+      log.error("Expected {} to have ACLs {} but was {}", baseNode, 
ZooUtil.PRIVATE, acls);
+      throw new IllegalStateException("Delegation token secret key node in 
ZooKeeper is not protected.");
+    }
+
+    initialized = true;
+  }
+
+  /**
+   * Fetch all {@link AuthenticationKey}s currently stored in ZooKeeper 
beneath the configured {@code baseNode}.
+   *
+   * @return A list of {@link AuthenticationKey}s
+   */
+  public List<AuthenticationKey> getCurrentKeys() throws KeeperException, 
InterruptedException {
+    checkState(initialized, "Not initialized");
+    List<String> children = zk.getChildren(baseNode);
+
+    // Shortcircuit to avoid a list creation
+    if (children.isEmpty()) {
+      return Collections.<AuthenticationKey> emptyList();
+    }
+
+    // Deserialize each byte[] into an AuthenticationKey
+    List<AuthenticationKey> keys = new ArrayList<>(children.size());
+    for (String child : children) {
+      byte[] data = zk.getData(qualifyPath(child), null);
+      if (null != data) {
+        AuthenticationKey key = new AuthenticationKey();
+        try {
+          key.readFields(new DataInputStream(new ByteArrayInputStream(data)));
+        } catch (IOException e) {
+          throw new AssertionError("Error reading from in-memory buffer which 
should not happen", e);
+        }
+        keys.add(key);
+      }
+    }
+
+    return keys;
+  }
+
+  /**
+   * Add the given {@link AuthenticationKey} to ZooKeeper.
+   *
+   * @param newKey
+   *          The key to add to ZooKeeper
+   */
+  public synchronized void advertise(AuthenticationKey newKey) throws 
KeeperException, InterruptedException {
+    checkState(initialized, "Not initialized");
+    checkNotNull(newKey);
+
+    // Make sure the node doesn't already exist
+    String path = qualifyPath(newKey);
+    if (zk.exists(path)) {
+      log.warn("AuthenticationKey with ID '{}' already exists in ZooKeeper", 
newKey.getKeyId());
+      return;
+    }
+
+    // Serialize it
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    try {
+      newKey.write(new DataOutputStream(baos));
+    } catch (IOException e) {
+      throw new AssertionError("Should not get exception writing to in-memory 
buffer", e);
+    }
+
+    byte[] serializedKey = baos.toByteArray();
+
+    log.debug("Advertising AuthenticationKey with keyId {} in ZooKeeper at 
{}", newKey.getKeyId(), path);
+
+    // Put it into ZK with the private ACL
+    zk.putPrivatePersistentData(path, serializedKey, NodeExistsPolicy.FAIL);
+  }
+
+  /**
+   * Remove the given {@link AuthenticationKey} from ZooKeeper. If the node 
for the provided {@code key} doesn't exist in ZooKeeper, a warning is printed 
but an
+   * error is not thrown. Since there is only a single process managing 
ZooKeeper at one time, any inconsistencies should be client error.
+   *
+   * @param key
+   *          The key to remove from ZooKeeper
+   */
+  public synchronized void remove(AuthenticationKey key) throws 
KeeperException, InterruptedException {
+    checkState(initialized, "Not initialized");
+    checkNotNull(key);
+
+    String path = qualifyPath(key);
+    if (!zk.exists(path)) {
+      log.warn("AuthenticationKey with ID '{}' doesn't exist in ZooKeeper", 
key.getKeyId());
+      return;
+    }
+
+    log.debug("Removing AuthenticationKey with keyId {} from ZooKeeper at {}", 
key.getKeyId(), path);
+
+    // Delete the node, any version
+    zk.delete(path, -1);
+  }
+
+  String qualifyPath(String keyId) {
+    return baseNode + "/" + keyId;
+  }
+
+  String qualifyPath(AuthenticationKey key) {
+    return qualifyPath(Integer.toString(key.getKeyId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java
new file mode 100644
index 0000000..2913343
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watch ZooKeeper to notice changes in the published keys so that 
authenticate can properly occur using delegation tokens.
+ */
+public class ZooAuthenticationKeyWatcher implements Watcher {
+  private static final Logger log = 
LoggerFactory.getLogger(ZooAuthenticationKeyWatcher.class);
+
+  private final AuthenticationTokenSecretManager secretManager;
+  private final ZooReader zk;
+  private final String baseNode;
+
+  public ZooAuthenticationKeyWatcher(AuthenticationTokenSecretManager 
secretManager, ZooReader zk, String baseNode) {
+    this.secretManager = secretManager;
+    this.zk = zk;
+    this.baseNode = baseNode;
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    if (EventType.None == event.getType()) {
+      switch (event.getState()) {
+        case Disconnected: // Intentional fall through of case
+        case Expired: // ZooReader is handling the Expiration of the original 
ZooKeeper object for us
+          log.debug("ZooKeeper connection disconnected, clearing secret 
manager");
+          secretManager.removeAllKeys();
+          break;
+        case SyncConnected:
+          log.debug("ZooKeeper reconnected, updating secret manager");
+          try {
+            updateAuthKeys();
+          } catch (KeeperException | InterruptedException e) {
+            log.error("Failed to update secret manager after ZooKeeper 
reconnect");
+          }
+          break;
+        default:
+          log.warn("Unhandled: " + event);
+      }
+
+      // Nothing more to do for EventType.None
+      return;
+    }
+
+    String path = event.getPath();
+    if (null == path) {
+      return;
+    }
+
+    if (!path.startsWith(baseNode)) {
+      log.info("Ignoring event for path: {}", path);
+      return;
+    }
+
+    try {
+      if (path.equals(baseNode)) {
+        processBaseNode(event);
+      } else {
+        processChildNode(event);
+      }
+    } catch (KeeperException | InterruptedException e) {
+      log.error("Failed to communicate with ZooKeeper", e);
+    }
+  }
+
+  /**
+   * Process the {@link WatchedEvent} for the base znode that the {@link 
AuthenticationKey}s are stored in.
+   */
+  void processBaseNode(WatchedEvent event) throws KeeperException, 
InterruptedException {
+    switch (event.getType()) {
+      case NodeDeleted:
+        // The parent node was deleted, no children are possible, remove all 
keys
+        log.debug("Parent ZNode was deleted, removing all AuthenticationKeys");
+        secretManager.removeAllKeys();
+        break;
+      case None:
+        // Not connected, don't care
+        break;
+      case NodeCreated: // intentional fall-through to NodeChildrenChanged
+      case NodeChildrenChanged:
+        // Process each child, and reset the watcher on the parent node. We 
know that the node exists
+        updateAuthKeys(event.getPath());
+        break;
+      case NodeDataChanged:
+        // The data on the parent changed. We aren't storing anything there so 
it's a noop
+        break;
+      default:
+        log.warn("Unsupported event type: {}", event.getType());
+        break;
+    }
+  }
+
+  /**
+   * Entry point to seed the local {@link AuthenticationKey} cache from 
ZooKeeper and set the first watcher for future updates in ZooKeeper.
+   */
+  public void updateAuthKeys() throws KeeperException, InterruptedException {
+    // Might cause two watchers on baseNode, but only at startup for each 
tserver.
+    if (zk.exists(baseNode, this)) {
+      log.info("Added {} existing AuthenticationKeys to local cache from 
ZooKeeper", updateAuthKeys(baseNode));
+    }
+  }
+
+  private int updateAuthKeys(String path) throws KeeperException, 
InterruptedException {
+    int keysAdded = 0;
+    for (String child : zk.getChildren(path, this)) {
+      String childPath = path + "/" + child;
+      // Get the node data and reset the watcher
+      AuthenticationKey key = deserializeKey(zk.getData(childPath, this, 
null));
+      secretManager.addKey(key);
+      keysAdded++;
+    }
+    return keysAdded;
+  }
+
+  /**
+   * Process the {@link WatchedEvent} for a node which represents an {@link 
AuthenticationKey}
+   */
+  void processChildNode(WatchedEvent event) throws KeeperException, 
InterruptedException {
+    final String path = event.getPath();
+    switch (event.getType()) {
+      case NodeDeleted:
+        // Key expired
+        if (null == path) {
+          log.error("Got null path for NodeDeleted event");
+          return;
+        }
+
+        // Pull off the base ZK path and the '/' separator
+        String childName = path.substring(baseNode.length() + 1);
+        secretManager.removeKey(Integer.parseInt(childName));
+        break;
+      case None:
+        // Not connected, don't care. We'll update when we're reconnected
+        break;
+      case NodeCreated:
+        // New key created
+        if (null == path) {
+          log.error("Got null path for NodeCreated event");
+          return;
+        }
+        // Get the data and reset the watcher
+        AuthenticationKey key = deserializeKey(zk.getData(path, this, null));
+        log.debug("Adding AuthenticationKey with keyId {}", key.getKeyId());
+        secretManager.addKey(key);
+        break;
+      case NodeDataChanged:
+        // Key changed, could happen on restart after not running Accumulo.
+        if (null == path) {
+          log.error("Got null path for NodeDataChanged event");
+          return;
+        }
+        // Get the data and reset the watcher
+        AuthenticationKey newKey = deserializeKey(zk.getData(path, this, 
null));
+        // Will overwrite the old key if one exists
+        secretManager.addKey(newKey);
+        break;
+      case NodeChildrenChanged:
+        // no children for the children..
+        log.warn("Unexpected NodeChildrenChanged event for authentication key 
node {}", path);
+        break;
+      default:
+        log.warn("Unsupported event type: {}", event.getType());
+        break;
+    }
+  }
+
+  /**
+   * Deserialize the bytes into an {@link AuthenticationKey}
+   */
+  AuthenticationKey deserializeKey(byte[] serializedKey) {
+    AuthenticationKey key = new AuthenticationKey();
+    try {
+      key.readFields(new DataInputStream(new 
ByteArrayInputStream(serializedKey)));
+    } catch (IOException e) {
+      throw new AssertionError("Failed to read from an in-memory buffer");
+    }
+    return key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
index 08fa55b..369fa89 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java
@@ -27,6 +27,7 @@ import 
org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
 import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.SiteConfiguration;
@@ -131,7 +132,7 @@ public class KerberosAuthenticator implements Authenticator 
{
     }
 
     // User is authenticated at the transport layer -- nothing extra is 
necessary
-    if (token instanceof KerberosToken) {
+    if (token instanceof KerberosToken || token instanceof DelegationToken) {
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
index 49a60a6..92b6be8 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.accumulo.server;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.security.PrivilegedExceptionAction;
 import java.util.Iterator;
 import java.util.Map.Entry;
@@ -24,12 +28,15 @@ import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
 import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -69,17 +76,27 @@ public class AccumuloServerContextTest {
         final AccumuloConfiguration conf = 
ClientContext.convertClientConfig(clientConf);
         SiteConfiguration siteConfig = 
EasyMock.createMock(SiteConfiguration.class);
 
+        
EasyMock.expect(siteConfig.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)).andReturn(true);
+
+        // Deal with SystemToken being private
+        PasswordToken pw = new PasswordToken("fake");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        pw.write(new DataOutputStream(baos));
+        SystemToken token = new SystemToken();
+        token.readFields(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray())));
+
         ServerConfigurationFactory factory = 
EasyMock.createMock(ServerConfigurationFactory.class);
         EasyMock.expect(factory.getConfiguration()).andReturn(conf).anyTimes();
         
EasyMock.expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes();
         EasyMock.expect(factory.getInstance()).andReturn(instance).anyTimes();
 
         AccumuloServerContext context = 
EasyMock.createMockBuilder(AccumuloServerContext.class).addMockedMethod("enforceKerberosLogin")
-            
.addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").createMock();
+            
.addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").addMockedMethod("getCredentials").createMock();
         context.enforceKerberosLogin();
         EasyMock.expectLastCall().anyTimes();
         EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
         
EasyMock.expect(context.getServerConfigurationFactory()).andReturn(factory).anyTimes();
+        EasyMock.expect(context.getCredentials()).andReturn(new 
Credentials("accumulo/hostn...@fake.com", token)).once();
 
         // Just make the SiteConfiguration delegate to our ClientConfiguration 
(by way of the AccumuloConfiguration)
         // Presently, we only need get(Property) and iterator().
@@ -101,8 +118,8 @@ public class AccumuloServerContextTest {
         EasyMock.replay(factory, context, siteConfig);
 
         Assert.assertEquals(ThriftServerType.SASL, 
context.getThriftServerType());
-        SaslConnectionParams saslParams = context.getServerSaslParams();
-        Assert.assertEquals(SaslConnectionParams.forConfig(conf), saslParams);
+        SaslServerConnectionParams saslParams = context.getSaslParams();
+        Assert.assertEquals(new SaslServerConnectionParams(conf, token), 
saslParams);
         Assert.assertEquals(username, saslParams.getPrincipal());
 
         EasyMock.verify(factory, context, siteConfig);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java
new file mode 100644
index 0000000..6c965ff
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslDigestCallbackHandlerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import javax.crypto.KeyGenerator;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.rpc.SaslDigestCallbackHandler;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
+import org.apache.accumulo.server.security.delegation.AuthenticationKey;
+import 
org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class SaslDigestCallbackHandlerTest {
+
+  /**
+   * Allows access to the methods on SaslDigestCallbackHandler
+   */
+  private static class SaslTestDigestCallbackHandler extends 
SaslDigestCallbackHandler {
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, 
UnsupportedCallbackException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  // From org.apache.hadoop.security.token.SecretManager
+  private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
+  private static final int KEY_LENGTH = 64;
+  private static KeyGenerator keyGen;
+
+  @BeforeClass
+  public static void setupKeyGenerator() throws Exception {
+    // From org.apache.hadoop.security.token.SecretManager
+    keyGen = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
+    keyGen.init(KEY_LENGTH);
+  }
+
+  private SaslTestDigestCallbackHandler handler;
+  private DelegationTokenConfig cfg;
+
+  @Before
+  public void setup() {
+    handler = new SaslTestDigestCallbackHandler();
+    cfg = new DelegationTokenConfig();
+  }
+
+  @Test
+  public void testIdentifierSerialization() throws IOException {
+    AuthenticationTokenIdentifier identifier = new 
AuthenticationTokenIdentifier("user", 1, 100l, 1000l, "instanceid");
+    byte[] serialized = identifier.getBytes();
+    String name = handler.encodeIdentifier(serialized);
+
+    byte[] reserialized = handler.decodeIdentifier(name);
+    assertArrayEquals(serialized, reserialized);
+
+    AuthenticationTokenIdentifier copy = new AuthenticationTokenIdentifier();
+    copy.readFields(new DataInputStream(new 
ByteArrayInputStream(reserialized)));
+
+    assertEquals(identifier, copy);
+  }
+
+  @Test
+  public void testTokenSerialization() throws Exception {
+    Instance instance = createMock(Instance.class);
+    AuthenticationTokenSecretManager secretManager = new 
AuthenticationTokenSecretManager(instance, 1000l);
+    expect(instance.getInstanceID()).andReturn("instanceid");
+
+    replay(instance);
+
+    secretManager.addKey(new AuthenticationKey(1, 0l, 100l, 
keyGen.generateKey()));
+    Entry<Token<AuthenticationTokenIdentifier>,AuthenticationTokenIdentifier> 
entry = secretManager.generateToken("user", cfg);
+    byte[] password = entry.getKey().getPassword();
+    char[] encodedPassword = handler.encodePassword(password);
+
+    char[] computedPassword = handler.getPassword(secretManager, 
entry.getValue());
+
+    verify(instance);
+
+    assertArrayEquals(computedPassword, encodedPassword);
+  }
+
+  @Test
+  public void testTokenAndIdentifierSerialization() throws Exception {
+    Instance instance = createMock(Instance.class);
+    AuthenticationTokenSecretManager secretManager = new 
AuthenticationTokenSecretManager(instance, 1000l);
+    expect(instance.getInstanceID()).andReturn("instanceid");
+
+    replay(instance);
+
+    secretManager.addKey(new AuthenticationKey(1, 0l, 1000 * 100l, 
keyGen.generateKey()));
+    Entry<Token<AuthenticationTokenIdentifier>,AuthenticationTokenIdentifier> 
entry = secretManager.generateToken("user", cfg);
+    byte[] password = entry.getKey().getPassword();
+    char[] encodedPassword = handler.encodePassword(password);
+    String name = handler.encodeIdentifier(entry.getValue().getBytes());
+
+    byte[] decodedIdentifier = handler.decodeIdentifier(name);
+    AuthenticationTokenIdentifier identifier = new 
AuthenticationTokenIdentifier();
+    identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(decodedIdentifier)));
+    char[] computedPassword = handler.getPassword(secretManager, identifier);
+
+    verify(instance);
+
+    assertArrayEquals(computedPassword, encodedPassword);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java
new file mode 100644
index 0000000..39bf9e4
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/SaslServerConnectionParamsTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.core.rpc.SaslConnectionParams.QualityOfProtection;
+import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SaslServerConnectionParamsTest {
+
+  private UserGroupInformation testUser;
+  private String username;
+
+  @Before
+  public void setup() throws Exception {
+    System.setProperty("java.security.krb5.realm", "accumulo");
+    System.setProperty("java.security.krb5.kdc", "fake");
+    Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    testUser = UserGroupInformation.createUserForTesting("test_user", new 
String[0]);
+    username = testUser.getUserName();
+  }
+
+  @Test
+  public void testDefaultParamsAsServer() throws Exception {
+    testUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        // Deal with SystemToken being private
+        PasswordToken pw = new PasswordToken("fake");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        pw.write(new DataOutputStream(baos));
+        SystemToken token = new SystemToken();
+        token.readFields(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray())));
+
+        final SaslConnectionParams saslParams = new 
SaslServerConnectionParams(rpcConf, token);
+        assertEquals(primary, saslParams.getKerberosServerPrimary());
+        assertEquals(SaslMechanism.GSSAPI, saslParams.getMechanism());
+        assertNull(saslParams.getCallbackHandler());
+
+        final QualityOfProtection defaultQop = 
QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
+        assertEquals(defaultQop, saslParams.getQualityOfProtection());
+
+        Map<String,String> properties = saslParams.getSaslProperties();
+        assertEquals(1, properties.size());
+        assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
+        assertEquals(username, saslParams.getPrincipal());
+        return null;
+      }
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java
new file mode 100644
index 0000000..02e22aa
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationKeyTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.delegation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class AuthenticationKeyTest {
+  // From org.apache.hadoop.security.token.SecretManager
+  private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
+  private static final int KEY_LENGTH = 64;
+  private static KeyGenerator keyGen;
+
+  @BeforeClass
+  public static void setupKeyGenerator() throws Exception {
+    // From org.apache.hadoop.security.token.SecretManager
+    keyGen = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
+    keyGen.init(KEY_LENGTH);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullSecretKey() {
+    new AuthenticationKey(0, 0, 0, null);
+  }
+
+  @Test
+  public void testAuthKey() {
+    SecretKey secretKey = keyGen.generateKey();
+    int keyId = 20;
+    long creationDate = 38383838l, expirationDate = 83838383l;
+    AuthenticationKey authKey = new AuthenticationKey(keyId, creationDate, 
expirationDate, secretKey);
+    assertEquals(secretKey, authKey.getKey());
+    assertEquals(keyId, authKey.getKeyId());
+    assertEquals(expirationDate, authKey.getExpirationDate());
+
+    // Empty instance
+    AuthenticationKey badCopy = new AuthenticationKey();
+
+    assertNotEquals(badCopy, authKey);
+    assertNotEquals(badCopy.hashCode(), authKey.hashCode());
+
+    // Different object, same arguments
+    AuthenticationKey goodCopy = new AuthenticationKey(keyId, creationDate, 
expirationDate, secretKey);
+    assertEquals(authKey, goodCopy);
+    assertEquals(authKey.hashCode(), goodCopy.hashCode());
+  }
+
+  @Test
+  public void testWritable() throws IOException {
+    SecretKey secretKey = keyGen.generateKey();
+    int keyId = 20;
+    long creationDate = 38383838l, expirationDate = 83838383l;
+    AuthenticationKey authKey = new AuthenticationKey(keyId, creationDate, 
expirationDate, secretKey);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+    authKey.write(out);
+    byte[] serialized = baos.toByteArray();
+
+    DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(serialized));
+    AuthenticationKey copy = new AuthenticationKey();
+    copy.readFields(in);
+
+    assertEquals(authKey, copy);
+    assertEquals(authKey.hashCode(), copy.hashCode());
+    assertEquals(secretKey, copy.getKey());
+    assertEquals(keyId, copy.getKeyId());
+    assertEquals(expirationDate, copy.getExpirationDate());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java
new file mode 100644
index 0000000..bc2968a
--- /dev/null
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/security/delegation/AuthenticationTokenKeyManagerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.security.delegation;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuthenticationTokenKeyManagerTest {
+  private static final Logger log = 
LoggerFactory.getLogger(AuthenticationTokenKeyManagerTest.class);
+
+  // From org.apache.hadoop.security.token.SecretManager
+  private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
+  private static final int KEY_LENGTH = 64;
+  private static KeyGenerator keyGen;
+
+  @BeforeClass
+  public static void setupKeyGenerator() throws Exception {
+    // From org.apache.hadoop.security.token.SecretManager
+    keyGen = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
+    keyGen.init(KEY_LENGTH);
+  }
+
+  private AuthenticationTokenSecretManager secretManager;
+  private ZooAuthenticationKeyDistributor zooDistributor;
+
+  @Before
+  public void setupMocks() {
+    secretManager = createMock(AuthenticationTokenSecretManager.class);
+    zooDistributor = createMock(ZooAuthenticationKeyDistributor.class);
+  }
+
+  @Test
+  public void testIntervalNotPassed() {
+    long updateInterval = 5 * 1000l;
+    long tokenLifetime = 100 * 1000l;
+    AuthenticationTokenKeyManager keyManager = new 
AuthenticationTokenKeyManager(secretManager, zooDistributor, updateInterval, 
tokenLifetime);
+
+    // Have never updated the key
+    assertEquals(0l, keyManager.getLastKeyUpdate());
+
+    // Always check for expired keys to remove
+    expect(secretManager.removeExpiredKeys(zooDistributor)).andReturn(0);
+
+    replay(secretManager, zooDistributor);
+
+    // Run at time 0. Last run time is still 0. 0 + 5000 > 0, so we won't 
generate a new key
+    keyManager._run(0);
+
+    verify(secretManager, zooDistributor);
+  }
+
+  @Test
+  public void testIntervalHasPassed() throws Exception {
+    long updateInterval = 0 * 1000l;
+    long tokenLifetime = 100 * 1000l;
+    long runTime = 10l;
+    SecretKey secretKey = keyGen.generateKey();
+
+    AuthenticationKey authKey = new AuthenticationKey(1, runTime, runTime + 
tokenLifetime, secretKey);
+    AuthenticationTokenKeyManager keyManager = new 
AuthenticationTokenKeyManager(secretManager, zooDistributor, updateInterval, 
tokenLifetime);
+
+    // Have never updated the key
+    assertEquals(0l, keyManager.getLastKeyUpdate());
+
+    // Always check for expired keys to remove
+    expect(secretManager.removeExpiredKeys(zooDistributor)).andReturn(0);
+    expect(secretManager.generateSecret()).andReturn(secretKey);
+    secretManager.addKey(authKey);
+    expectLastCall().once();
+    zooDistributor.advertise(authKey);
+    expectLastCall().once();
+
+    replay(secretManager, zooDistributor);
+
+    // Run at time 10. Last run time is still 0. 0 + 10 > 0, so we will 
generate a new key
+    keyManager._run(runTime);
+
+    verify(secretManager, zooDistributor);
+
+    // Last key update time should match when we ran
+    assertEquals(runTime, keyManager.getLastKeyUpdate());
+    // KeyManager uses the incremented value for the new AuthKey (the current 
idSeq will match the keyId for the last generated key)
+    assertEquals(authKey.getKeyId(), keyManager.getIdSeq());
+  }
+
+  @Test(timeout = 30 * 1000)
+  public void testStopLoop() throws InterruptedException {
+    final AuthenticationTokenKeyManager keyManager = 
EasyMock.createMockBuilder(AuthenticationTokenKeyManager.class).addMockedMethod("_run")
+        .addMockedMethod("updateStateFromCurrentKeys").createMock();
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // Mock out the _run and updateStateFromCurrentKeys method so we just get 
the logic from "run()"
+    keyManager._run(EasyMock.anyLong());
+    expectLastCall().once();
+    keyManager.updateStateFromCurrentKeys();
+    expectLastCall().once();
+
+    replay(keyManager);
+
+    keyManager.setKeepRunning(true);
+
+    // Wrap another Runnable around our KeyManager so we know when the thread 
is actually run as it's "async" when the method will actually be run after we 
call
+    // thread.start()
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        log.info("Thread running");
+        latch.countDown();
+        keyManager.run();
+      }
+    });
+
+    log.info("Starting thread");
+    t.start();
+
+    // Wait for the thread to start
+    latch.await();
+    log.info("Latch fired");
+
+    // Wait a little bit to let the first call to _run() happen (avoid exiting 
the loop before any calls to _run())
+    Thread.sleep(1000);
+
+    log.info("Finished waiting, stopping keymanager");
+
+    keyManager.gracefulStop();
+
+    log.info("Waiting for thread to exit naturally");
+
+    t.join();
+
+    verify(keyManager);
+  }
+
+  @Test
+  public void testExistingKeysAreAddedAtStartup() throws Exception {
+    long updateInterval = 0 * 1000l;
+    long tokenLifetime = 100 * 1000l;
+    SecretKey secretKey1 = keyGen.generateKey(), secretKey2 = 
keyGen.generateKey();
+
+    AuthenticationKey authKey1 = new AuthenticationKey(1, 0, tokenLifetime, 
secretKey1), authKey2 = new AuthenticationKey(2, tokenLifetime, tokenLifetime * 
2,
+        secretKey2);
+    AuthenticationTokenKeyManager keyManager = new 
AuthenticationTokenKeyManager(secretManager, zooDistributor, updateInterval, 
tokenLifetime);
+
+    // Have never updated the key
+    assertEquals(0l, keyManager.getLastKeyUpdate());
+
+    // Always check for expired keys to remove
+    expect(zooDistributor.getCurrentKeys()).andReturn(Arrays.asList(authKey1, 
authKey2));
+    secretManager.addKey(authKey1);
+    expectLastCall().once();
+    secretManager.addKey(authKey2);
+    expectLastCall().once();
+    expect(secretManager.getCurrentKey()).andReturn(authKey2).once();
+
+    replay(secretManager, zooDistributor);
+
+    // Initialize the state from zookeeper
+    keyManager.updateStateFromCurrentKeys();
+
+    verify(secretManager, zooDistributor);
+
+    assertEquals(authKey2.getKeyId(), keyManager.getIdSeq());
+    assertEquals(authKey2.getCreationDate(), keyManager.getLastKeyUpdate());
+  }
+}

Reply via email to