http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/security/thrift/TDelegationTokenOptions.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/security/thrift/TDelegationTokenOptions.java
 
b/core/src/main/java/org/apache/accumulo/core/security/thrift/TDelegationTokenOptions.java
new file mode 100644
index 0000000..c19eb75
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/security/thrift/TDelegationTokenOptions.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.accumulo.core.security.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"unchecked", "serial", "rawtypes", "unused"}) public class 
TDelegationTokenOptions implements 
org.apache.thrift.TBase<TDelegationTokenOptions, 
TDelegationTokenOptions._Fields>, java.io.Serializable, Cloneable, 
Comparable<TDelegationTokenOptions> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("TDelegationTokenOptions");
+
+  private static final org.apache.thrift.protocol.TField LIFETIME_FIELD_DESC = 
new org.apache.thrift.protocol.TField("lifetime", 
org.apache.thrift.protocol.TType.I64, (short)1);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new 
TDelegationTokenOptionsStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new 
TDelegationTokenOptionsTupleSchemeFactory());
+  }
+
+  public long lifetime; // optional
+
+  /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    LIFETIME((short)1, "lifetime");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not 
found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // LIFETIME
+          return LIFETIME;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __LIFETIME_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private _Fields optionals[] = {_Fields.LIFETIME};
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.LIFETIME, new 
org.apache.thrift.meta_data.FieldMetaData("lifetime", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TDelegationTokenOptions.class,
 metaDataMap);
+  }
+
+  public TDelegationTokenOptions() {
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public TDelegationTokenOptions(TDelegationTokenOptions other) {
+    __isset_bitfield = other.__isset_bitfield;
+    this.lifetime = other.lifetime;
+  }
+
+  public TDelegationTokenOptions deepCopy() {
+    return new TDelegationTokenOptions(this);
+  }
+
+  @Override
+  public void clear() {
+    setLifetimeIsSet(false);
+    this.lifetime = 0;
+  }
+
+  public long getLifetime() {
+    return this.lifetime;
+  }
+
+  public TDelegationTokenOptions setLifetime(long lifetime) {
+    this.lifetime = lifetime;
+    setLifetimeIsSet(true);
+    return this;
+  }
+
+  public void unsetLifetime() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__LIFETIME_ISSET_ID);
+  }
+
+  /** Returns true if field lifetime is set (has been assigned a value) and 
false otherwise */
+  public boolean isSetLifetime() {
+    return EncodingUtils.testBit(__isset_bitfield, __LIFETIME_ISSET_ID);
+  }
+
+  public void setLifetimeIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__LIFETIME_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case LIFETIME:
+      if (value == null) {
+        unsetLifetime();
+      } else {
+        setLifetime((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case LIFETIME:
+      return Long.valueOf(getLifetime());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned 
a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case LIFETIME:
+      return isSetLifetime();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof TDelegationTokenOptions)
+      return this.equals((TDelegationTokenOptions)that);
+    return false;
+  }
+
+  public boolean equals(TDelegationTokenOptions that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_lifetime = true && this.isSetLifetime();
+    boolean that_present_lifetime = true && that.isSetLifetime();
+    if (this_present_lifetime || that_present_lifetime) {
+      if (!(this_present_lifetime && that_present_lifetime))
+        return false;
+      if (this.lifetime != that.lifetime)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public int compareTo(TDelegationTokenOptions other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = 
Boolean.valueOf(isSetLifetime()).compareTo(other.isSetLifetime());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetLifetime()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.lifetime, 
other.lifetime);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("TDelegationTokenOptions(");
+    boolean first = true;
+
+    if (isSetLifetime()) {
+      sb.append("lifetime:");
+      sb.append(this.lifetime);
+      first = false;
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class TDelegationTokenOptionsStandardSchemeFactory implements 
SchemeFactory {
+    public TDelegationTokenOptionsStandardScheme getScheme() {
+      return new TDelegationTokenOptionsStandardScheme();
+    }
+  }
+
+  private static class TDelegationTokenOptionsStandardScheme extends 
StandardScheme<TDelegationTokenOptions> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, 
TDelegationTokenOptions struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // LIFETIME
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.lifetime = iprot.readI64();
+              struct.setLifetimeIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, 
TDelegationTokenOptions struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.isSetLifetime()) {
+        oprot.writeFieldBegin(LIFETIME_FIELD_DESC);
+        oprot.writeI64(struct.lifetime);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class TDelegationTokenOptionsTupleSchemeFactory implements 
SchemeFactory {
+    public TDelegationTokenOptionsTupleScheme getScheme() {
+      return new TDelegationTokenOptionsTupleScheme();
+    }
+  }
+
+  private static class TDelegationTokenOptionsTupleScheme extends 
TupleScheme<TDelegationTokenOptions> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, 
TDelegationTokenOptions struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetLifetime()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetLifetime()) {
+        oprot.writeI64(struct.lifetime);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, 
TDelegationTokenOptions struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.lifetime = iprot.readI64();
+        struct.setLifetimeIsSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java 
b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
new file mode 100644
index 0000000..c79aac0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftMessageUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+
+/**
+ * Serializes and deserializes Thrift messages to and from byte arrays. This 
class is not thread-safe, external synchronization is necessary if it is used
+ * concurrently.
+ */
+public class ThriftMessageUtil {
+
+  private final AutoExpandingBufferWriteTransport transport;
+  private final TProtocol protocol;
+
+  public ThriftMessageUtil() {
+    this(64, 1.5);
+  }
+
+  public ThriftMessageUtil(int initialCapacity, double growthCoefficient) {
+    // TODO does this make sense? better to push this down to the serialize 
method (accept the transport as an argument)?
+    this.transport = new AutoExpandingBufferWriteTransport(initialCapacity, 
growthCoefficient);
+    this.protocol = new TCompactProtocol(transport);
+  }
+
+  /**
+   * Convert the {@link msg} to a byte array representation
+   *
+   * @param msg
+   *          The message to serialize
+   * @return The serialized message
+   * @throws IOException
+   *           When serialization fails
+   */
+  public ByteBuffer serialize(TBase<?,?> msg) throws IOException {
+    checkNotNull(msg);
+    transport.reset();
+    try {
+      msg.write(protocol);
+      // We should flush(), but we know its a noop
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    return ByteBuffer.wrap(transport.getBuf().array(), 0, transport.getPos());
+  }
+
+  /**
+   * @see #deserialize(byte[], int, int, T)
+   */
+  public <T extends TBase<?,?>> T deserialize(ByteBuffer serialized, T 
instance) throws IOException {
+    checkNotNull(serialized);
+    return deserialize(serialized.array(), serialized.arrayOffset(), 
serialized.limit(), instance);
+  }
+
+  /**
+   * Assumes the entire contents of the byte array compose the serialized 
{@link instance}
+   *
+   * @see #deserialize(byte[], int, int, TBase)
+   */
+  public <T extends TBase<?,?>> T deserialize(byte[] serialized, T instance) 
throws IOException {
+    return deserialize(serialized, 0, serialized.length, instance);
+  }
+
+  /**
+   * Deserializes a message into the provided {@link instance} from {@link 
serialized}
+   *
+   * @param serialized
+   *          The serialized representation of the object
+   * @param instance
+   *          An instance of the object to reconstitute
+   * @return The reconstituted instance provided
+   * @throws IOException
+   *           When deserialization fails
+   */
+  public <T extends TBase<?,?>> T deserialize(byte[] serialized, int offset, 
int length, T instance) throws IOException {
+    checkNotNull(instance);
+    TCompactProtocol proto = new TCompactProtocol(new 
TMemoryInputTransport(serialized, offset, length));
+    try {
+      instance.read(proto);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/thrift/master.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/master.thrift 
b/core/src/main/thrift/master.thrift
index d89e381..8a83438 100644
--- a/core/src/main/thrift/master.thrift
+++ b/core/src/main/thrift/master.thrift
@@ -173,4 +173,7 @@ service MasterClientService extends FateService {
   oneway void reportTabletStatus(5:trace.TInfo tinfo, 1:security.TCredentials 
credentials, 2:string serverName, 3:TabletLoadState status, 4:data.TKeyExtent 
tablet)
 
   list<string> getActiveTservers(1:trace.TInfo tinfo, 2:security.TCredentials 
credentials) throws (1:client.ThriftSecurityException sec)
+
+  // Delegation token request
+  security.TDelegationToken getDelegationToken(1:trace.TInfo tinfo, 
2:security.TCredentials credentials, 3:security.TDelegationTokenConfig cfg) 
throws (1:client.ThriftSecurityException sec)
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/main/thrift/security.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/security.thrift 
b/core/src/main/thrift/security.thrift
index 66235a8..74b7f12 100644
--- a/core/src/main/thrift/security.thrift
+++ b/core/src/main/thrift/security.thrift
@@ -24,3 +24,26 @@ struct TCredentials {
     4:string instanceId
 }
 
+struct TAuthenticationTokenIdentifier {
+    1:string principal,
+    2:optional i32 keyId,
+    3:optional i64 issueDate,
+    4:optional i64 expirationDate,
+    5:optional string instanceId
+}
+
+struct TAuthenticationKey {
+    1:binary secret,
+    2:optional i32 keyId,
+    3:optional i64 expirationDate,
+    4:optional i64 creationDate
+}
+
+struct TDelegationToken {
+    1:binary password,
+    2:TAuthenticationTokenIdentifier identifier
+}
+
+struct TDelegationTokenConfig {
+    1:optional i64 lifetime
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/client/admin/DelegationTokenConfigTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/admin/DelegationTokenConfigTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/admin/DelegationTokenConfigTest.java
new file mode 100644
index 0000000..f1553dc
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/admin/DelegationTokenConfigTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.admin;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+public class DelegationTokenConfigTest {
+
+  @Test
+  public void testTimeUnit() {
+    DelegationTokenConfig config1 = new DelegationTokenConfig(), config2 = new 
DelegationTokenConfig();
+
+    config1.setTokenLifetime(1000, TimeUnit.MILLISECONDS);
+    config2.setTokenLifetime(1, TimeUnit.SECONDS);
+
+    assertEquals(config1.getTokenLifetime(TimeUnit.MILLISECONDS), 
config2.getTokenLifetime(TimeUnit.MILLISECONDS));
+    assertEquals(config1, config2);
+    assertEquals(config1.hashCode(), config2.hashCode());
+  }
+
+  @Test
+  public void testNoTimeout() {
+    DelegationTokenConfig config = new DelegationTokenConfig();
+
+    config.setTokenLifetime(0, TimeUnit.MILLISECONDS);
+
+    assertEquals(0, config.getTokenLifetime(TimeUnit.MILLISECONDS));
+
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidLifetime() {
+    new DelegationTokenConfig().setTokenLifetime(-1, TimeUnit.DAYS);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testSetInvalidTimeUnit() {
+    new DelegationTokenConfig().setTokenLifetime(5, null);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testGetInvalidTimeUnit() {
+    new DelegationTokenConfig().getTokenLifetime(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializerTest.java
new file mode 100644
index 0000000..4499a58
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/impl/DelegationTokenConfigSerializerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.security.thrift.TDelegationTokenConfig;
+import org.junit.Test;
+
+public class DelegationTokenConfigSerializerTest {
+
+  @Test
+  public void test() {
+    DelegationTokenConfig cfg = new DelegationTokenConfig();
+    cfg.setTokenLifetime(8323, TimeUnit.HOURS);
+
+    TDelegationTokenConfig tCfg = 
DelegationTokenConfigSerializer.serialize(cfg);
+    assertEquals(tCfg.getLifetime(), 
cfg.getTokenLifetime(TimeUnit.MILLISECONDS));
+
+    assertEquals(cfg, DelegationTokenConfigSerializer.deserialize(tCfg));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
index 2723273..04b9ae8 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/impl/ThriftTransportKeyTest.java
@@ -20,17 +20,38 @@ import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.easymock.EasyMock;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.net.HostAndPort;
 
 public class ThriftTransportKeyTest {
 
+  @Before
+  public void setup() throws Exception {
+    System.setProperty("java.security.krb5.realm", "accumulo");
+    System.setProperty("java.security.krb5.kdc", "fake");
+    Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+    UserGroupInformation.setConfiguration(conf);
+  }
+
   @Test(expected = RuntimeException.class)
   public void testSslAndSaslErrors() {
     ClientContext clientCtx = createMock(ClientContext.class);
@@ -38,7 +59,7 @@ public class ThriftTransportKeyTest {
     SaslConnectionParams saslParams = createMock(SaslConnectionParams.class);
 
     expect(clientCtx.getClientSslParams()).andReturn(sslParams).anyTimes();
-    expect(clientCtx.getClientSaslParams()).andReturn(saslParams).anyTimes();
+    expect(clientCtx.getSaslParams()).andReturn(saslParams).anyTimes();
 
     // We don't care to verify the sslparam or saslparam mocks
     replay(clientCtx);
@@ -51,20 +72,78 @@ public class ThriftTransportKeyTest {
   }
 
   @Test
-  public void testSaslPrincipalIsSignificant() {
-    SaslConnectionParams saslParams1 = createMock(SaslConnectionParams.class), 
saslParams2 = createMock(SaslConnectionParams.class);
-    expect(saslParams1.getPrincipal()).andReturn("user1");
-    expect(saslParams2.getPrincipal()).andReturn("user2");
+  public void testConnectionCaching() throws IOException, InterruptedException 
{
+    UserGroupInformation user1 = 
UserGroupInformation.createUserForTesting("user1", new String[0]);
+    final KerberosToken token = EasyMock.createMock(KerberosToken.class);
+    final ClientConfiguration clientConf = ClientConfiguration.loadDefault();
+    // The primary is the first component of the principal
+    final String primary = "accumulo";
+    clientConf.withSasl(true, primary);
+
+    // A first instance of the SASL cnxn params
+    SaslConnectionParams saslParams1 = user1.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        return new SaslConnectionParams(clientConf, token);
+      }
+    });
+
+    // A second instance of what should be the same SaslConnectionParams
+    SaslConnectionParams saslParams2 = user1.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        return new SaslConnectionParams(clientConf, token);
+      }
+    });
 
-    replay(saslParams1, saslParams2);
+    ThriftTransportKey ttk1 = new 
ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1l, null, 
saslParams1), ttk2 = new ThriftTransportKey(
+        HostAndPort.fromParts("localhost", 9997), 1l, null, saslParams2);
+
+    // Should equals() and hashCode() to make sure we don't throw away thrift 
cnxns
+    assertEquals(ttk1, ttk2);
+    assertEquals(ttk1.hashCode(), ttk2.hashCode());
+  }
+
+  @Test
+  public void testSaslPrincipalIsSignificant() throws IOException, 
InterruptedException {
+    UserGroupInformation user1 = 
UserGroupInformation.createUserForTesting("user1", new String[0]);
+    final KerberosToken token = EasyMock.createMock(KerberosToken.class);
+    SaslConnectionParams saslParams1 = user1.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        return new SaslConnectionParams(clientConf, token);
+      }
+    });
+
+    UserGroupInformation user2 = 
UserGroupInformation.createUserForTesting("user2", new String[0]);
+    SaslConnectionParams saslParams2 = user2.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        return new SaslConnectionParams(clientConf, token);
+      }
+    });
 
     ThriftTransportKey ttk1 = new 
ThriftTransportKey(HostAndPort.fromParts("localhost", 9997), 1l, null, 
saslParams1), ttk2 = new ThriftTransportKey(
         HostAndPort.fromParts("localhost", 9997), 1l, null, saslParams2);
 
     assertNotEquals(ttk1, ttk2);
     assertNotEquals(ttk1.hashCode(), ttk2.hashCode());
-
-    verify(saslParams1, saslParams2);
   }
 
   @Test
@@ -72,7 +151,7 @@ public class ThriftTransportKeyTest {
     ClientContext clientCtx = createMock(ClientContext.class);
 
     expect(clientCtx.getClientSslParams()).andReturn(null).anyTimes();
-    expect(clientCtx.getClientSaslParams()).andReturn(null).anyTimes();
+    expect(clientCtx.getSaslParams()).andReturn(null).anyTimes();
 
     replay(clientCtx);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/client/security/tokens/DelegationTokenTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/DelegationTokenTest.java
 
b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/DelegationTokenTest.java
new file mode 100644
index 0000000..f66a1ee
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/DelegationTokenTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client.security.tokens;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
+import org.junit.Test;
+
+public class DelegationTokenTest {
+
+  @Test
+  public void testSerialization() throws IOException {
+    AuthenticationTokenIdentifier identifier = new 
AuthenticationTokenIdentifier("user", 1, 1000l, 2000l, "instanceid");
+    // We don't need a real serialized Token for the password
+    DelegationToken token = new DelegationToken(new byte[] {'f', 'a', 'k', 
'e'}, identifier);
+    assertEquals(token, token);
+    assertEquals(token.hashCode(), token.hashCode());
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    token.write(new DataOutputStream(baos));
+
+    DelegationToken copy = new DelegationToken();
+    copy.readFields(new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray())));
+
+    assertEquals(token, copy);
+    assertEquals(token.hashCode(), copy.hashCode());
+  }
+
+  @Test
+  public void testEquality() throws IOException {
+    AuthenticationTokenIdentifier identifier = new 
AuthenticationTokenIdentifier("user", 1, 1000l, 2000l, "instanceid");
+    // We don't need a real serialized Token for the password
+    DelegationToken token = new DelegationToken(new byte[] {'f', 'a', 'k', 
'e'}, identifier);
+
+    AuthenticationTokenIdentifier identifier2 = new 
AuthenticationTokenIdentifier("user1", 1, 1000l, 2000l, "instanceid");
+    // We don't need a real serialized Token for the password
+    DelegationToken token2 = new DelegationToken(new byte[] {'f', 'a', 'k', 
'e'}, identifier2);
+
+    assertNotEquals(token, token2);
+    assertNotEquals(token.hashCode(), token2.hashCode());
+
+    // We don't need a real serialized Token for the password
+    DelegationToken token3 = new DelegationToken(new byte[] {'f', 'a', 'k', 
'e', '0'}, identifier);
+
+    assertNotEquals(token, token3);
+    assertNotEquals(token.hashCode(), token3.hashCode());
+    assertNotEquals(token2, token3);
+    assertNotEquals(token2.hashCode(), token3.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/rpc/SaslClientDigestCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/rpc/SaslClientDigestCallbackHandlerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/rpc/SaslClientDigestCallbackHandlerTest.java
new file mode 100644
index 0000000..f38e2e3
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/rpc/SaslClientDigestCallbackHandlerTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.rpc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class SaslClientDigestCallbackHandlerTest {
+
+  @Test
+  public void testEquality() {
+    SaslClientDigestCallbackHandler handler1 = new 
SaslClientDigestCallbackHandler("user", "mypass".toCharArray()), handler2 = new 
SaslClientDigestCallbackHandler(
+        "user", "mypass".toCharArray());
+    assertEquals(handler1, handler2);
+    assertEquals(handler1.hashCode(), handler2.hashCode());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java 
b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
index 3910f34..9b77d25 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/rpc/SaslConnectionParamsTest.java
@@ -17,7 +17,8 @@
 package org.apache.accumulo.core.rpc;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
@@ -27,12 +28,17 @@ import javax.security.sasl.Sasl;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.security.tokens.DelegationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.rpc.SaslConnectionParams.QualityOfProtection;
+import org.apache.accumulo.core.rpc.SaslConnectionParams.SaslMechanism;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,15 +59,37 @@ public class SaslConnectionParamsTest {
   }
 
   @Test
-  public void testNullParams() {
-    ClientConfiguration clientConf = new ClientConfiguration();
-    AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
-    assertEquals("false", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
-    assertNull(SaslConnectionParams.forConfig(rpcConf));
+  public void testDefaultParamsAsClient() throws Exception {
+    final KerberosToken token = EasyMock.createMock(KerberosToken.class);
+    testUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        final SaslConnectionParams saslParams = new 
SaslConnectionParams(clientConf, token);
+        assertEquals(primary, saslParams.getKerberosServerPrimary());
+
+        final QualityOfProtection defaultQop = 
QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
+        assertEquals(defaultQop, saslParams.getQualityOfProtection());
+
+        Map<String,String> properties = saslParams.getSaslProperties();
+        assertEquals(1, properties.size());
+        assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
+        assertEquals(username, saslParams.getPrincipal());
+        return null;
+      }
+    });
   }
 
   @Test
-  public void testDefaultParamsAsClient() throws Exception {
+  public void testDefaultParams() throws Exception {
+    final KerberosToken token = EasyMock.createMock(KerberosToken.class);
     testUser.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
@@ -71,9 +99,10 @@ public class SaslConnectionParamsTest {
         final String primary = "accumulo";
         clientConf.withSasl(true, primary);
 
+        final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
         assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
 
-        final SaslConnectionParams saslParams = 
SaslConnectionParams.forConfig(clientConf);
+        final SaslConnectionParams saslParams = new 
SaslConnectionParams(rpcConf, token);
         assertEquals(primary, saslParams.getKerberosServerPrimary());
 
         final QualityOfProtection defaultQop = 
QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
@@ -89,7 +118,8 @@ public class SaslConnectionParamsTest {
   }
 
   @Test
-  public void testDefaultParamsAsServer() throws Exception {
+  public void testDelegationToken() throws Exception {
+    final DelegationToken token = new DelegationToken(new byte[0], new 
AuthenticationTokenIdentifier("user", 1, 10l, 20l, "instanceid"));
     testUser.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
@@ -102,12 +132,16 @@ public class SaslConnectionParamsTest {
         final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
         assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
 
-        final SaslConnectionParams saslParams = 
SaslConnectionParams.forConfig(rpcConf);
+        final SaslConnectionParams saslParams = new 
SaslConnectionParams(rpcConf, token);
         assertEquals(primary, saslParams.getKerberosServerPrimary());
 
         final QualityOfProtection defaultQop = 
QualityOfProtection.get(Property.RPC_SASL_QOP.getDefaultValue());
         assertEquals(defaultQop, saslParams.getQualityOfProtection());
 
+        assertEquals(SaslMechanism.DIGEST_MD5, saslParams.getMechanism());
+        assertNotNull(saslParams.getCallbackHandler());
+        assertEquals(SaslClientDigestCallbackHandler.class, 
saslParams.getCallbackHandler().getClass());
+
         Map<String,String> properties = saslParams.getSaslProperties();
         assertEquals(1, properties.size());
         assertEquals(defaultQop.getQuality(), properties.get(Sasl.QOP));
@@ -117,4 +151,89 @@ public class SaslConnectionParamsTest {
     });
   }
 
+  @Test
+  public void testEquality() throws Exception {
+    final KerberosToken token = EasyMock.createMock(KerberosToken.class);
+    SaslConnectionParams params1 = testUser.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        return new SaslConnectionParams(rpcConf, token);
+      }
+    });
+
+    SaslConnectionParams params2 = testUser.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        return new SaslConnectionParams(rpcConf, token);
+      }
+    });
+
+    assertEquals(params1, params2);
+    assertEquals(params1.hashCode(), params2.hashCode());
+
+    final DelegationToken delToken1 = new DelegationToken(new byte[0], new 
AuthenticationTokenIdentifier("user", 1, 10l, 20l, "instanceid"));
+    SaslConnectionParams params3 = testUser.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        return new SaslConnectionParams(rpcConf, delToken1);
+      }
+    });
+
+    assertNotEquals(params1, params3);
+    assertNotEquals(params1.hashCode(), params3.hashCode());
+    assertNotEquals(params2, params3);
+    assertNotEquals(params2.hashCode(), params3.hashCode());
+
+    final DelegationToken delToken2 = new DelegationToken(new byte[0], new 
AuthenticationTokenIdentifier("user", 1, 10l, 20l, "instanceid"));
+    SaslConnectionParams params4 = testUser.doAs(new 
PrivilegedExceptionAction<SaslConnectionParams>() {
+      @Override
+      public SaslConnectionParams run() throws Exception {
+        final ClientConfiguration clientConf = 
ClientConfiguration.loadDefault();
+
+        // The primary is the first component of the principal
+        final String primary = "accumulo";
+        clientConf.withSasl(true, primary);
+
+        final AccumuloConfiguration rpcConf = 
ClientContext.convertClientConfig(clientConf);
+        assertEquals("true", 
clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED));
+
+        return new SaslConnectionParams(rpcConf, delToken2);
+      }
+    });
+
+    assertNotEquals(params1, params4);
+    assertNotEquals(params1.hashCode(), params4.hashCode());
+    assertNotEquals(params2, params4);
+    assertNotEquals(params2.hashCode(), params4.hashCode());
+
+    assertEquals(params3, params4);
+    assertEquals(params3.hashCode(), params4.hashCode());
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/security/AuthenticationTokenIdentifierTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/security/AuthenticationTokenIdentifierTest.java
 
b/core/src/test/java/org/apache/accumulo/core/security/AuthenticationTokenIdentifierTest.java
new file mode 100644
index 0000000..d3c1f20
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/security/AuthenticationTokenIdentifierTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+public class AuthenticationTokenIdentifierTest {
+
+  @Test
+  public void testUgi() {
+    String principal = "user";
+    AuthenticationTokenIdentifier token = new 
AuthenticationTokenIdentifier(principal);
+    UserGroupInformation actual = token.getUser(), expected = 
UserGroupInformation.createRemoteUser(principal);
+    assertEquals(expected.getAuthenticationMethod(), 
actual.getAuthenticationMethod());
+    assertEquals(expected.getUserName(), expected.getUserName());
+  }
+
+  @Test
+  public void testEquality() {
+    String principal = "user";
+    AuthenticationTokenIdentifier token = new 
AuthenticationTokenIdentifier(principal);
+    assertEquals(token, token);
+    AuthenticationTokenIdentifier newToken = new 
AuthenticationTokenIdentifier(principal);
+    assertEquals(token, newToken);
+    assertEquals(token.hashCode(), newToken.hashCode());
+  }
+
+  @Test
+  public void testExtendedEquality() {
+    String principal = "user";
+    AuthenticationTokenIdentifier token = new 
AuthenticationTokenIdentifier(principal);
+    assertEquals(token, token);
+    AuthenticationTokenIdentifier newToken = new 
AuthenticationTokenIdentifier(principal, 1, 5l, 10l, "uuid");
+    assertNotEquals(token, newToken);
+    assertNotEquals(token.hashCode(), newToken.hashCode());
+    AuthenticationTokenIdentifier dblNewToken = new 
AuthenticationTokenIdentifier(principal);
+    dblNewToken.setKeyId(1);
+    dblNewToken.setIssueDate(5l);
+    dblNewToken.setExpirationDate(10l);
+    dblNewToken.setInstanceId("uuid");
+  }
+
+  @Test
+  public void testToString() {
+    String principal = "my_special_principal";
+    AuthenticationTokenIdentifier token = new 
AuthenticationTokenIdentifier(principal);
+    assertTrue(token.toString().contains(principal));
+  }
+
+  @Test
+  public void testSerialization() throws IOException {
+    String principal = "my_special_principal";
+    AuthenticationTokenIdentifier token = new 
AuthenticationTokenIdentifier(principal);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+    token.write(out);
+    DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    AuthenticationTokenIdentifier deserializedToken = new 
AuthenticationTokenIdentifier();
+    deserializedToken.readFields(in);
+    assertEquals(token, deserializedToken);
+    assertEquals(token.hashCode(), deserializedToken.hashCode());
+    assertEquals(token.toString(), deserializedToken.toString());
+  }
+
+  @Test
+  public void testTokenKind() {
+    String principal = "my_special_principal";
+    AuthenticationTokenIdentifier token = new 
AuthenticationTokenIdentifier(principal);
+    assertEquals(AuthenticationTokenIdentifier.TOKEN_KIND, token.getKind());
+  }
+
+  @Test
+  public void testNullMsg() throws IOException {
+    AuthenticationTokenIdentifier token = new AuthenticationTokenIdentifier();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+    token.write(out);
+    DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(baos.toByteArray()));
+    AuthenticationTokenIdentifier deserializedToken = new 
AuthenticationTokenIdentifier();
+    deserializedToken.readFields(in);
+    assertEquals(token, deserializedToken);
+    assertEquals(token.hashCode(), deserializedToken.hashCode());
+    assertEquals(token.toString(), deserializedToken.toString());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java 
b/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java
new file mode 100644
index 0000000..765d9ca
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/util/ThriftMessageUtilTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.accumulo.core.security.thrift.TAuthenticationTokenIdentifier;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThriftMessageUtilTest {
+
+  private TAuthenticationTokenIdentifier msg;
+  private ThriftMessageUtil util;
+
+  @Before
+  public void setup() {
+    msg = new TAuthenticationTokenIdentifier("principal");
+    util = new ThriftMessageUtil();
+  }
+
+  @Test
+  public void testSerialization() throws IOException {
+    ByteBuffer buff = util.serialize(msg);
+    TAuthenticationTokenIdentifier bbMsg = new 
TAuthenticationTokenIdentifier();
+    util.deserialize(buff, bbMsg);
+    assertEquals(msg, bbMsg);
+  }
+
+  @Test
+  public void testSerializationAsByteArray() throws IOException {
+    ByteBuffer buff = util.serialize(msg);
+    TAuthenticationTokenIdentifier copy = new TAuthenticationTokenIdentifier();
+    byte[] array = new byte[buff.limit()];
+    System.arraycopy(buff.array(), 0, array, 0, buff.limit());
+    util.deserialize(array, copy);
+    assertEquals(msg, copy);
+  }
+
+  @Test
+  public void testSerializationAsByteArrayWithLimits() throws IOException {
+    ByteBuffer buff = util.serialize(msg);
+    TAuthenticationTokenIdentifier copy = new TAuthenticationTokenIdentifier();
+
+    byte[] array = new byte[buff.limit() + 14];
+    // Throw some garbage in front and behind the actual message
+    array[0] = 'G';
+    array[1] = 'A';
+    array[2] = 'R';
+    array[3] = 'B';
+    array[4] = 'A';
+    array[5] = 'G';
+    array[6] = 'E';
+    System.arraycopy(buff.array(), 0, array, 7, buff.limit());
+    array[7 + buff.limit()] = 'G';
+    array[7 + buff.limit() + 1] = 'A';
+    array[7 + buff.limit() + 2] = 'R';
+    array[7 + buff.limit() + 3] = 'B';
+    array[7 + buff.limit() + 4] = 'A';
+    array[7 + buff.limit() + 5] = 'G';
+    array[7 + buff.limit() + 6] = 'E';
+
+    util.deserialize(array, 7, buff.limit(), copy);
+    assertEquals(msg, copy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/docs/src/main/asciidoc/chapters/kerberos.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/kerberos.txt 
b/docs/src/main/asciidoc/chapters/kerberos.txt
index 05d7384..dc2484b 100644
--- a/docs/src/main/asciidoc/chapters/kerberos.txt
+++ b/docs/src/main/asciidoc/chapters/kerberos.txt
@@ -73,6 +73,27 @@ password, at the cost of needing to protect the keytab file. 
These principals
 will apply directly to authentication for clients accessing Accumulo and the
 Accumulo processes accessing HDFS.
 
+=== Delegation Tokens
+
+MapReduce, a common way that clients interact with Accumulo, does not map well 
to the
+client-server model that Kerberos was originally designed to support. 
Specifically, the parallelization
+of tasks across many nodes introduces the problem of securely sharing the user 
credentials across
+these tasks in as safe a manner as possible. To address this problem, Hadoop 
introduced the notion
+of a delegation token to be used in distributed execution settings.
+
+A delegation token is nothing more than a short-term, on-the-fly password 
generated after authenticating with the user's
+credentials.  In Hadoop itself, the Namenode and ResourceManager, for HDFS and 
YARN respectively, act as the gateway for
+delegation tokens requests. For example, before a YARN job is submitted, the 
implementation will request delegation
+tokens from the NameNode and ResourceManager so the YARN tasks can communicate 
with HDFS and YARN. In the same manner,
+support has been added in the Accumulo Master to generate delegation tokens to 
enable interaction with Accumulo via
+MapReduce when Kerberos authentication is enabled in a manner similar to HDFS 
and YARN.
+
+Generating an expiring password is, arguably, more secure than distributing 
the user's
+credentials across the cluster as only access to HDFS, YARN and Accumulo would 
be
+compromised in the case of the token being compromised as opposed to the entire
+Kerberos credential. Additional details for clients and servers will be covered
+in subsequent sections.
+
 === Configuring Accumulo
 
 To configure Accumulo for use with Kerberos, both client-facing and 
server-facing
@@ -149,6 +170,12 @@ serializing traces to the trace table.
 still use a normal KerberosToken and the same keytab/principal to serialize 
traces. Like
 non-Kerberized instances, the table must be created and permissions granted to 
the trace.user.
 ** The same +_HOST+ replacement is performed on this value, substituted the 
FQDN for +_HOST+.
+* *general.delegation.token.lifetime*=_7d_
+** The length of time that the server-side secret used to create delegation 
tokens is valid.
+   After a server-side secret expires, a delegation token created with that 
secret is no longer valid.
+* *general.delegation.token.update.interval*=_1d_
+** The frequency in which new server-side secrets should be generated to 
create delegation
+   tokens for clients. Generating new secrets reduces the likelihood of 
cryptographic attacks.
 
 Although it should be a prerequisite, it is ever important that you have DNS 
properly
 configured for your nodes and that Accumulo is configured to use the FQDN. It
@@ -220,6 +247,34 @@ requests from.
 Both the hosts and users configuration properties also accept a value of +*+ 
to denote that any user or host
 is acceptable for +$PROXY_USER+.
 
+===== Delegation Tokens
+
+Within Accumulo services, the primary task to implement delegation tokens is 
the generation and distribution
+of a shared secret among all Accumulo tabletservers and the master. The secret 
key allows for generation
+of delegation tokens for users and verification of delegation tokens presented 
by clients. If a server
+process is unaware of the secret key used to create a delegation token, the 
client cannot be authenticated.
+As ZooKeeper distribution is an asynchronous operation (typically on the order 
of seconds), the 
+value for `general.delegation.token.update.interval` should be on the order of 
hours to days to reduce the
+likelihood of servers rejecting valid clients because the server did not yet 
see a new secret key.
+
+Supporting authentication with both Kerberos credentials and delegation 
tokens, the SASL thrift
+server accepts connections with either `GSSAPI` and `DIGEST-MD5` mechanisms 
set. The `DIGEST-MD5` mechanism
+enables authentication as a normal username and password exchange which 
`DelegationToken`s leverages.
+
+Since delegation tokens are a weaker form of authentication than Kerberos 
credentials, user access
+to obtain delegation tokens from Accumulo is protected with the 
`DELEGATION_TOKEN` system permission. Only
+users with the system permission are allowed to obtain delegation tokens. It 
is also recommended
+to configure confidentiality with SASL, using the `rpc.sasl.qop=auth-conf` 
configuration property, to
+ensure that prying eyes cannot view the `DelegationToken` as it passes over 
the network.
+
+----
+# Check a user's permissions
+admin@REALM@accumulo> userpermissions -u user@REALM
+
+# Grant the DELEGATION_TOKEN system permission to a user
+admin@REALM@accumulo> grant System.DELEGATION_TOKEN -s -u user@REALM
+----
+
 ==== Clients
 
 ===== Create client principal
@@ -265,6 +320,61 @@ Three items need to be set to enable access to Accumulo:
 The second and third properties *must* match the configuration of the accumulo 
servers; this is
 required to set up the SASL transport.
 
+===== DelegationTokens with MapReduce
+
+To use DelegationTokens in a custom MapReduce job, the call to 
`setConnectorInfo()` method
+on `AccumuloInputFormat` or `AccumuloOutputFormat` should be the only 
necessary change. Instead
+of providing an instance of a `KerberosToken`, the user must call 
`SecurityOperations.getDelegationToken`
+using a `Connector` obtained with that `KerberosToken`, and pass the 
`DelegationToken` to
+`setConnectorInfo` instead of the `KerberosToken`. It is expected that the 
user launching
+the MapReduce job is already logged in via Kerberos via a keytab or via a 
locally-cached
+Kerberos ticket-granting-ticket (TGT).
+
+[source,java]
+----
+Instance instance = getInstance();
+KerberosToken kt = new KerberosToken();
+Connector conn = instance.getConnector(principal, kt);
+DelegationToken dt = conn.securityOperations().getDelegationToken();
+
+// Reading from Accumulo
+AccumuloInputFormat.setConnectorInfo(job, principal, dt);
+
+// Writing to Accumulo
+AccumuloOutputFormat.setConnectorInfo(job, principal, dt);
+----
+
+If the user passes a `KerberosToken` to the `setConnectorInfo` method, the 
implementation will
+attempt to obtain a `DelegationToken` automatically, but this does have 
limitations
+based on the other MapReduce configuration methods already called and 
permissions granted
+to the calling user. It is best for the user to acquire the DelegationToken on 
their own
+and provide it directly to `setConnectorInfo`.
+
+Users must have the `DELEGATION_TOKEN` system permission to call the 
`getDelegationToken`
+method. The obtained delegation token is only valid for the requesting user 
for a period
+of time dependent on Accumulo's configuration 
(`general.delegation.token.lifetime`).
+
+It is also possible to obtain and use `DelegationToken`s outside of the context
+of MapReduce.
+
+[source,java]
+----
+String principal = "user@REALM";
+Instance instance = getInstance();
+Connector connector = instance.getConnector(principal, new KerberosToken());
+DelegationToken delegationToken = 
connector.securityOperations().getDelegationToken();
+
+Connector dtConnector = instance.getConnector(principal, delegationToken);
+----
+
+Use of the `dtConnector` will perform each operation as the original user, but 
without
+their Kerberos credentials.
+
+For the duration of validity of the `DelegationToken`, the user *must* take 
the necessary precautions
+to protect the `DelegationToken` from prying eyes as it can be used by any 
user on any host to impersonate
+the user who requested the `DelegationToken`. YARN ensures that passing the 
delegation token from the client
+JVM to each YARN task is secure, even in multi-tenant instances.
+
 ==== Debugging
 
 *Q*: I have valid Kerberos credentials and a correct client configuration file 
but 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
----------------------------------------------------------------------
diff --git 
a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java 
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
index 610b1bd..1923582 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReader.java
@@ -20,6 +20,7 @@ import java.util.List;
 
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 public interface IZooReader {
@@ -28,6 +29,8 @@ public interface IZooReader {
 
   byte[] getData(String zPath, boolean watch, Stat stat) throws 
KeeperException, InterruptedException;
 
+  byte[] getData(String zPath, Watcher watcher, Stat stat) throws 
KeeperException, InterruptedException;
+
   Stat getStatus(String zPath) throws KeeperException, InterruptedException;
 
   Stat getStatus(String zPath, Watcher watcher) throws KeeperException, 
InterruptedException;
@@ -42,4 +45,5 @@ public interface IZooReader {
 
   void sync(final String path) throws KeeperException, InterruptedException;
 
+  List<ACL> getACL(String zPath, Stat stat) throws KeeperException, 
InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
----------------------------------------------------------------------
diff --git 
a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java 
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
index 5706cf3..707959c 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReader.java
@@ -20,12 +20,14 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.accumulo.fate.zookeeper.ZooUtil.ZooKeeperConnectionInfo;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 public class ZooReader implements IZooReader {
@@ -34,6 +36,7 @@ public class ZooReader implements IZooReader {
   protected String keepers;
   protected int timeout;
   private final RetryFactory retryFactory;
+  private final ZooKeeperConnectionInfo info;
 
   protected ZooKeeper getSession(String keepers, int timeout, String scheme, 
byte[] auth) {
     return ZooSession.getSession(keepers, timeout, scheme, auth);
@@ -83,6 +86,25 @@ public class ZooReader implements IZooReader {
   }
 
   @Override
+  public byte[] getData(String zPath, Watcher watcher, Stat stat) throws 
KeeperException, InterruptedException {
+    final Retry retry = getRetryFactory().create();
+    while (true) {
+      try {
+        return getZooKeeper().getData(zPath, watcher, stat);
+      } catch (KeeperException e) {
+        final Code code = e.code();
+        if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || 
code == Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
+  }
+
+  @Override
   public Stat getStatus(String zPath) throws KeeperException, 
InterruptedException {
     final Retry retry = getRetryFactory().create();
     while (true) {
@@ -220,9 +242,15 @@ public class ZooReader implements IZooReader {
     }
   }
 
+  @Override
+  public List<ACL> getACL(String zPath, Stat stat) throws KeeperException, 
InterruptedException {
+    return ZooUtil.getACL(info, zPath, stat);
+  }
+
   public ZooReader(String keepers, int timeout) {
     this.keepers = keepers;
     this.timeout = timeout;
     this.retryFactory = RetryFactory.DEFAULT_INSTANCE;
+    this.info = new ZooKeeperConnectionInfo(keepers, timeout, null, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java 
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
index 805bfff..abb1aeb 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
@@ -518,4 +518,22 @@ public class ZooUtil {
     }
   }
 
+  public static List<ACL> getACL(ZooKeeperConnectionInfo info, String zPath, 
Stat stat) throws KeeperException, InterruptedException {
+    final Retry retry = RETRY_FACTORY.create();
+    while (true) {
+      try {
+        return getZooKeeper(info).getACL(zPath, stat);
+      } catch (KeeperException e) {
+        final Code c = e.code();
+        if (c == Code.CONNECTIONLOSS || c == Code.OPERATIONTIMEOUT || c == 
Code.SESSIONEXPIRED) {
+          retryOrThrow(retry, e);
+        } else {
+          throw e;
+        }
+      }
+
+      retry.waitForNextAttempt();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java 
b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index f9039be..e97481c 100644
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -25,15 +25,17 @@ import java.util.Properties;
 
 import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.proxy.thrift.AccumuloProxy;
 import org.apache.accumulo.server.metrics.MetricsFactory;
 import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftServerType;
@@ -204,16 +206,15 @@ public class Proxy implements KeywordExecutable {
 
     ClientConfiguration clientConf = ClientConfiguration.loadDefault();
     SslConnectionParams sslParams = null;
-    SaslConnectionParams saslParams = null;
+    SaslServerConnectionParams saslParams = null;
     switch (serverType) {
       case SSL:
         sslParams = 
SslConnectionParams.forClient(ClientContext.convertClientConfig(clientConf));
         break;
       case SASL:
-        saslParams = SaslConnectionParams.forConfig(clientConf);
-        if (null == saslParams) {
+        if 
(!clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey())) {
           log.fatal("SASL thrift server was requested but it is disabled in 
client configuration");
-          throw new RuntimeException();
+          throw new RuntimeException("SASL is not enabled in configuration");
         }
 
         // Kerberos needs to be enabled to use it
@@ -233,6 +234,9 @@ public class Proxy implements KeywordExecutable {
         UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
         log.info("Logged in as " + ugi.getUserName());
 
+        KerberosToken token = new KerberosToken();
+        saslParams = new SaslServerConnectionParams(clientConf, token, null);
+
         processor = new UGIAssumingProcessor(processor);
 
         break;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
index 84c3853..6a59822 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloServerContext.java
@@ -18,19 +18,26 @@ package org.apache.accumulo.server;
 
 import java.io.IOException;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.ConnectorImpl;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.rpc.SaslConnectionParams;
 import org.apache.accumulo.core.rpc.SslConnectionParams;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
 import org.apache.accumulo.server.rpc.ThriftServerType;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.security.SystemCredentials;
+import 
org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
@@ -41,14 +48,23 @@ import com.google.common.base.Preconditions;
 public class AccumuloServerContext extends ClientContext {
 
   private final ServerConfigurationFactory confFactory;
+  private AuthenticationTokenSecretManager secretManager;
 
   /**
    * Construct a server context from the server's configuration
    */
   public AccumuloServerContext(ServerConfigurationFactory confFactory) {
+    this(confFactory, null);
+  }
+
+  /**
+   * Construct a server context from the server's configuration
+   */
+  public AccumuloServerContext(ServerConfigurationFactory confFactory, 
AuthenticationTokenSecretManager secretManager) {
     super(confFactory.getInstance(), 
getCredentials(confFactory.getInstance()), confFactory.getConfiguration());
     this.confFactory = confFactory;
-    if (null != getServerSaslParams()) {
+    this.secretManager = secretManager;
+    if (null != getSaslParams()) {
       // Server-side "client" check to make sure we're logged in as a user we 
expect to be
       enforceKerberosLogin();
     }
@@ -65,7 +81,7 @@ public class AccumuloServerContext extends ClientContext {
     UserGroupInformation loginUser;
     try {
       // The system user should be logged in via keytab when the process is 
started, not the currentUser() like KerberosToken
-      loginUser = UserGroupInformation.getLoginUser();
+      loginUser = UserGroupInformation.getCurrentUser();
     } catch (IOException e) {
       throw new RuntimeException("Could not get login user", e);
     }
@@ -99,9 +115,13 @@ public class AccumuloServerContext extends ClientContext {
     return SslConnectionParams.forServer(getConfiguration());
   }
 
-  public SaslConnectionParams getServerSaslParams() {
-    // Not functionally different than the client SASL params, just uses the 
site configuration
-    return 
SaslConnectionParams.forConfig(getServerConfigurationFactory().getSiteConfiguration());
+  @Override
+  public SaslServerConnectionParams getSaslParams() {
+    AccumuloConfiguration conf = 
getServerConfigurationFactory().getSiteConfiguration();
+    if (!conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
+      return null;
+    }
+    return new SaslServerConnectionParams(conf, getCredentials().getToken(), 
secretManager);
   }
 
   /**
@@ -130,4 +150,28 @@ public class AccumuloServerContext extends ClientContext {
     }
   }
 
+  public void setSecretManager(AuthenticationTokenSecretManager secretManager) 
{
+    this.secretManager = secretManager;
+  }
+
+  public AuthenticationTokenSecretManager getSecretManager() {
+    return secretManager;
+  }
+
+  // Need to override this from ClientContext to ensure that HdfsZooInstance 
doesn't "downcast"
+  // the AccumuloServerContext into a ClientContext (via the copy-constructor 
on ClientContext)
+  @Override
+  public Connector getConnector() throws AccumuloException, 
AccumuloSecurityException {
+    // avoid making more connectors than necessary
+    if (conn == null) {
+      if (inst instanceof ZooKeeperInstance || inst instanceof 
HdfsZooInstance) {
+        // reuse existing context
+        conn = new ConnectorImpl(this);
+      } else {
+        Credentials c = getCredentials();
+        conn = getInstance().getConnector(c.getPrincipal(), c.getToken());
+      }
+    }
+    return conn;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index bf56a7a..7ee6f0c 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -30,7 +30,6 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.server.AccumuloServerContext;
 
 public class MetaDataStateStore extends TabletStateStore {
-  // private static final Logger log = 
Logger.getLogger(MetaDataStateStore.class);
 
   private static final int THREADS = 4;
   private static final int LATENCY = 1000;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerConnectionParams.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerConnectionParams.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerConnectionParams.java
new file mode 100644
index 0000000..dc0b81a
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerConnectionParams.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.rpc.SaslConnectionParams;
+import org.apache.accumulo.server.security.SystemCredentials.SystemToken;
+import 
org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
+
+/**
+ * Server-side SASL connection information
+ */
+public class SaslServerConnectionParams extends SaslConnectionParams {
+
+  private AuthenticationTokenSecretManager secretManager;
+
+  public SaslServerConnectionParams(AccumuloConfiguration conf, 
AuthenticationToken token) {
+    this(conf, token, null);
+  }
+
+  public SaslServerConnectionParams(AccumuloConfiguration conf, 
AuthenticationToken token, AuthenticationTokenSecretManager secretManager) {
+    super(conf, token);
+    setSecretManager(secretManager);
+  }
+
+  public SaslServerConnectionParams(ClientConfiguration conf, 
AuthenticationToken token) {
+    this(conf, token, null);
+  }
+
+  public SaslServerConnectionParams(ClientConfiguration conf, 
AuthenticationToken token, AuthenticationTokenSecretManager secretManager) {
+    super(conf, token);
+    setSecretManager(secretManager);
+  }
+
+  @Override
+  protected void updateFromToken(AuthenticationToken token) {
+    // Servers should never have a delegation token -- only a strong kerberos 
identity
+    if (token instanceof KerberosToken || token instanceof SystemToken) {
+      mechanism = SaslMechanism.GSSAPI;
+    } else {
+      throw new IllegalArgumentException("Cannot determine SASL mechanism for 
token class: " + token.getClass());
+    }
+  }
+
+  public AuthenticationTokenSecretManager getSecretManager() {
+    return secretManager;
+  }
+
+  public void setSecretManager(AuthenticationTokenSecretManager secretManager) 
{
+    this.secretManager = secretManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c983317/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerDigestCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerDigestCallbackHandler.java
 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerDigestCallbackHandler.java
new file mode 100644
index 0000000..c43f7ed
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/rpc/SaslServerDigestCallbackHandler.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+
+import org.apache.accumulo.core.rpc.SaslDigestCallbackHandler;
+import org.apache.accumulo.core.security.AuthenticationTokenIdentifier;
+import 
org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CallbackHandler for SASL DIGEST-MD5 mechanism. Modified copy from Hadoop, 
uses our TokenIdentifier and SecretManager implementations
+ */
+public class SaslServerDigestCallbackHandler extends SaslDigestCallbackHandler 
{
+  private static final Logger log = 
LoggerFactory.getLogger(SaslServerDigestCallbackHandler.class);
+  private static final String NAME = 
SaslServerDigestCallbackHandler.class.getSimpleName();
+
+  private AuthenticationTokenSecretManager secretManager;
+
+  public SaslServerDigestCallbackHandler(AuthenticationTokenSecretManager 
secretManager) {
+    this.secretManager = secretManager;
+  }
+
+  private AuthenticationTokenIdentifier getIdentifier(String id, 
AuthenticationTokenSecretManager secretManager) throws InvalidToken {
+    byte[] tokenId = decodeIdentifier(id);
+    AuthenticationTokenIdentifier tokenIdentifier = 
secretManager.createIdentifier();
+    try {
+      tokenIdentifier.readFields(new DataInputStream(new 
ByteArrayInputStream(tokenId)));
+    } catch (IOException e) {
+      throw (InvalidToken) new InvalidToken("Can't de-serialize 
tokenIdentifier").initCause(e);
+    }
+    return tokenIdentifier;
+  }
+
+  @Override
+  public void handle(Callback[] callbacks) throws InvalidToken, 
UnsupportedCallbackException {
+    NameCallback nc = null;
+    PasswordCallback pc = null;
+    AuthorizeCallback ac = null;
+    for (Callback callback : callbacks) {
+      if (callback instanceof AuthorizeCallback) {
+        ac = (AuthorizeCallback) callback;
+      } else if (callback instanceof NameCallback) {
+        nc = (NameCallback) callback;
+      } else if (callback instanceof PasswordCallback) {
+        pc = (PasswordCallback) callback;
+      } else if (callback instanceof RealmCallback) {
+        continue; // realm is ignored
+      } else {
+        throw new UnsupportedCallbackException(callback, "Unrecognized SASL 
DIGEST-MD5 Callback");
+      }
+    }
+
+    if (pc != null) {
+      AuthenticationTokenIdentifier tokenIdentifier = 
getIdentifier(nc.getDefaultName(), secretManager);
+      char[] password = getPassword(secretManager, tokenIdentifier);
+      UserGroupInformation user = null;
+      user = tokenIdentifier.getUser();
+
+      // Set the principal since we already deserialized the token identifier
+      
UGIAssumingProcessor.getRpcPrincipalThreadLocal().set(user.getUserName());
+
+      log.trace("SASL server DIGEST-MD5 callback: setting password for client: 
{}", tokenIdentifier.getUser());
+      pc.setPassword(password);
+    }
+    if (ac != null) {
+      String authid = ac.getAuthenticationID();
+      String authzid = ac.getAuthorizationID();
+      if (authid.equals(authzid)) {
+        ac.setAuthorized(true);
+      } else {
+        ac.setAuthorized(false);
+      }
+      if (ac.isAuthorized()) {
+        String username = getIdentifier(authzid, 
secretManager).getUser().getUserName();
+        log.trace("SASL server DIGEST-MD5 callback: setting canonicalized 
client ID: {}", username);
+        ac.setAuthorizedID(authzid);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return NAME;
+  }
+}

Reply via email to