http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java new file mode 100644 index 0000000..b047f1a --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthorizor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.handler; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.Base64; + +/** + * Kerberos principals might contains identifiers that are not valid ZNodes ('/'). Base64-encodes the principals before interacting with ZooKeeper. + */ +public class KerberosAuthorizor implements Authorizor { + + private static KerberosAuthorizor INST; + + public static synchronized KerberosAuthorizor getInstance() { + if (INST == null) + INST = new KerberosAuthorizor(); + return INST; + } + + private final ZKAuthorizor zkAuthorizor; + + public KerberosAuthorizor() { + zkAuthorizor = new ZKAuthorizor(); + } + + @Override + public void initialize(String instanceId, boolean initialize) { + zkAuthorizor.initialize(instanceId, initialize); + } + + @Override + public boolean validSecurityHandlers(Authenticator auth, PermissionHandler pm) { + return auth instanceof KerberosAuthenticator && pm instanceof KerberosPermissionHandler; + } + + @Override + public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException { + zkAuthorizor.initializeSecurity(credentials, Base64.encodeBase64String(rootuser.getBytes(UTF_8))); + } + + @Override + public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException { + zkAuthorizor.changeAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)), authorizations); + } + + @Override + public Authorizations getCachedUserAuthorizations(String user) throws AccumuloSecurityException { + return zkAuthorizor.getCachedUserAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8))); + } + + @Override + public boolean isValidAuthorizations(String user, List<ByteBuffer> list) throws AccumuloSecurityException { + return zkAuthorizor.isValidAuthorizations(Base64.encodeBase64String(user.getBytes(UTF_8)), list); + } + + @Override + public void initUser(String user) throws AccumuloSecurityException { + zkAuthorizor.initUser(Base64.encodeBase64String(user.getBytes(UTF_8))); + } + + @Override + public void dropUser(String user) throws AccumuloSecurityException { + user = Base64.encodeBase64String(user.getBytes(UTF_8)); + zkAuthorizor.dropUser(user); + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java new file mode 100644 index 0000000..691c555 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosPermissionHandler.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.security.handler; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.security.NamespacePermission; +import org.apache.accumulo.core.security.SystemPermission; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.util.Base64; + +/** + * Kerberos principals might contains identifiers that are not valid ZNodes ('/'). Base64-encodes the principals before interacting with ZooKeeper. + */ +public class KerberosPermissionHandler implements PermissionHandler { + + private static KerberosPermissionHandler INST; + + public static synchronized KerberosPermissionHandler getInstance() { + if (INST == null) + INST = new KerberosPermissionHandler(); + return INST; + } + + private final ZKPermHandler zkPermissionHandler; + + public KerberosPermissionHandler() { + zkPermissionHandler = new ZKPermHandler(); + } + + @Override + public void initialize(String instanceId, boolean initialize) { + zkPermissionHandler.initialize(instanceId, initialize); + } + + @Override + public boolean validSecurityHandlers(Authenticator authent, Authorizor author) { + return authent instanceof KerberosAuthenticator && author instanceof KerberosAuthorizor; + } + + @Override + public void initializeSecurity(TCredentials credentials, String rootuser) throws AccumuloSecurityException, ThriftSecurityException { + zkPermissionHandler.initializeSecurity(credentials, Base64.encodeBase64String(rootuser.getBytes(UTF_8))); + } + + @Override + public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { + return zkPermissionHandler.hasSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission); + } + + @Override + public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { + return zkPermissionHandler.hasCachedSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission); + } + + @Override + public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException { + return zkPermissionHandler.hasTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission); + } + + @Override + public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException { + return zkPermissionHandler.hasCachedTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission); + } + + @Override + public boolean hasNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException, + NamespaceNotFoundException { + return zkPermissionHandler.hasNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission); + } + + @Override + public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException, + NamespaceNotFoundException { + return zkPermissionHandler.hasCachedNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission); + } + + @Override + public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { + zkPermissionHandler.grantSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission); + } + + @Override + public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { + zkPermissionHandler.revokeSystemPermission(Base64.encodeBase64String(user.getBytes(UTF_8)), permission); + } + + @Override + public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException { + zkPermissionHandler.grantTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission); + } + + @Override + public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException { + zkPermissionHandler.revokeTablePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), table, permission); + } + + @Override + public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException, + NamespaceNotFoundException { + zkPermissionHandler.grantNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission); + } + + @Override + public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException, + NamespaceNotFoundException { + zkPermissionHandler.revokeNamespacePermission(Base64.encodeBase64String(user.getBytes(UTF_8)), namespace, permission); + } + + @Override + public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException { + zkPermissionHandler.cleanTablePermissions(table); + } + + @Override + public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException, NamespaceNotFoundException { + zkPermissionHandler.cleanNamespacePermissions(namespace); + } + + @Override + public void initUser(String user) throws AccumuloSecurityException { + zkPermissionHandler.initUser(Base64.encodeBase64String(user.getBytes(UTF_8))); + } + + @Override + public void initTable(String table) throws AccumuloSecurityException { + zkPermissionHandler.initTable(table); + } + + @Override + public void cleanUser(String user) throws AccumuloSecurityException { + zkPermissionHandler.cleanUser(Base64.encodeBase64String(user.getBytes(UTF_8))); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java new file mode 100644 index 0000000..4e4f8ce --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/thrift/UGIAssumingProcessor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.thrift; + +import java.io.IOException; + +import javax.security.sasl.SaslServer; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Processor that pulls the SaslServer object out of the transport, and assumes the remote user's UGI before calling through to the original processor. + * + * This is used on the server side to set the UGI for each specific call. + * + * Lifted from Apache Hive 0.14 + */ +public class UGIAssumingProcessor implements TProcessor { + private static final Logger log = LoggerFactory.getLogger(UGIAssumingProcessor.class); + + public static final ThreadLocal<String> principal = new ThreadLocal<String>(); + private final TProcessor wrapped; + private final UserGroupInformation loginUser; + + public UGIAssumingProcessor(TProcessor wrapped) { + this.wrapped = wrapped; + try { + this.loginUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + log.error("Failed to obtain login user", e); + throw new RuntimeException("Failed to obtain login user", e); + } + } + + /** + * The principal of the user who authenticated over SASL. + */ + public static String currentPrincipal() { + return principal.get(); + } + + @Override + public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException { + TTransport trans = inProt.getTransport(); + if (!(trans instanceof TSaslServerTransport)) { + throw new TException("Unexpected non-SASL transport " + trans.getClass() + ": " + trans); + } + TSaslServerTransport saslTrans = (TSaslServerTransport) trans; + SaslServer saslServer = saslTrans.getSaslServer(); + String authId = saslServer.getAuthorizationID(); + String endUser = authId; + + log.trace("Received SASL RPC from {}", endUser); + + UserGroupInformation clientUgi = UserGroupInformation.createProxyUser(endUser, loginUser); + final String remoteUser = clientUgi.getUserName(); + + try { + // Set the principal in the ThreadLocal for access to get authorizations + principal.set(remoteUser); + + return wrapped.process(inProt, outProt); + } finally { + // Unset the principal after we're done using it just to be sure that it's not incorrectly + // used in the same thread down the line. + principal.set(null); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 7d247f7..8407c15 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.impl.MasterClient; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.security.Authorizations; @@ -56,6 +57,7 @@ import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.cli.ClientOpts; import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.security.SecurityUtil; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; @@ -174,6 +176,13 @@ public class Admin { cl.usage(); return; } + + AccumuloConfiguration siteConf = SiteConfiguration.getInstance(); + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } + Instance instance = opts.getInstance(); ServerConfigurationFactory confFactory = new ServerConfigurationFactory(instance); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index ef182f1..759d898 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -20,9 +20,13 @@ import java.util.List; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.cli.ClientOpts; +import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.log4j.Logger; @@ -64,6 +68,12 @@ public class ZooZap { return; } + AccumuloConfiguration siteConf = SiteConfiguration.getInstance(); + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } + String iid = opts.getInstance().getInstanceID(); IZooReaderWriter zoo = ZooReaderWriter.getInstance(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java new file mode 100644 index 0000000..56f3933 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/AccumuloServerContextTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.rpc.SaslConnectionParams; +import org.apache.accumulo.server.conf.ServerConfigurationFactory; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.UserGroupInformation; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AccumuloServerContextTest { + + private String user; + + @Before + public void setup() throws Exception { + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + user = UserGroupInformation.getCurrentUser().getUserName(); + } + + @Test + public void testSasl() throws Exception { + MockInstance instance = new MockInstance(); + + ClientConfiguration clientConf = ClientConfiguration.loadDefault(); + clientConf.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true"); + clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY, "accumulo"); + final AccumuloConfiguration conf = ClientContext.convertClientConfig(clientConf); + SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); + + ServerConfigurationFactory factory = EasyMock.createMock(ServerConfigurationFactory.class); + EasyMock.expect(factory.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); + EasyMock.expect(factory.getInstance()).andReturn(instance).anyTimes(); + + AccumuloServerContext context = EasyMock.createMockBuilder(AccumuloServerContext.class).addMockedMethod("enforceKerberosLogin") + .addMockedMethod("getConfiguration").addMockedMethod("getServerConfigurationFactory").createMock(); + context.enforceKerberosLogin(); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.expect(context.getServerConfigurationFactory()).andReturn(factory).anyTimes(); + + // Just make the SiteConfiguration delegate to our ClientConfiguration (by way of the AccumuloConfiguration) + // Presently, we only need get(Property) and iterator(). + EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() { + @Override + public String answer() { + Object[] args = EasyMock.getCurrentArguments(); + return conf.get((Property) args[0]); + } + }).anyTimes(); + + EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() { + @Override + public Iterator<Entry<String,String>> answer() { + return conf.iterator(); + } + }).anyTimes(); + + EasyMock.replay(factory, context, siteConfig); + + Assert.assertEquals(ThriftServerType.SASL, context.getThriftServerType()); + SaslConnectionParams saslParams = context.getServerSaslParams(); + Assert.assertEquals(SaslConnectionParams.forConfig(conf), saslParams); + Assert.assertEquals(user, saslParams.getPrincipal()); + + EasyMock.verify(factory, context, siteConfig); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java new file mode 100644 index 0000000..aba1aa0 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TCredentialsUpdatingInvocationHandlerTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.server.thrift.UGIAssumingProcessor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TCredentialsUpdatingInvocationHandlerTest { + + TCredentialsUpdatingInvocationHandler<Object> proxy; + + @Before + public void setup() { + proxy = new TCredentialsUpdatingInvocationHandler<Object>(new Object()); + } + + @After + public void teardown() { + UGIAssumingProcessor.principal.set(null); + } + + @Test + public void testNoArgsAreIgnored() throws Exception { + proxy.updateArgs(new Object[] {}); + } + + @Test + public void testNoTCredsInArgsAreIgnored() throws Exception { + proxy.updateArgs(new Object[] {new Object(), new Object()}); + } + + @Test + public void testCachedTokenClass() throws Exception { + final String principal = "root"; + ConcurrentHashMap<String,Class<? extends AuthenticationToken>> cache = proxy.getTokenCache(); + cache.clear(); + TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString()); + UGIAssumingProcessor.principal.set(principal); + proxy.updateArgs(new Object[] {new Object(), tcreds}); + Assert.assertEquals(1, cache.size()); + Assert.assertEquals(KerberosToken.class, cache.get(KerberosToken.CLASS_NAME)); + } + + @Test(expected = ThriftSecurityException.class) + public void testMissingPrincipal() throws Exception { + final String principal = "root"; + TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString()); + UGIAssumingProcessor.principal.set(null); + proxy.updateArgs(new Object[] {new Object(), tcreds}); + } + + @Test(expected = ThriftSecurityException.class) + public void testMismatchedPrincipal() throws Exception { + final String principal = "root"; + TCredentials tcreds = new TCredentials(principal, KerberosToken.CLASS_NAME, ByteBuffer.allocate(0), UUID.randomUUID().toString()); + UGIAssumingProcessor.principal.set(principal + "foobar"); + proxy.updateArgs(new Object[] {new Object(), tcreds}); + } + + @Test(expected = ThriftSecurityException.class) + public void testWrongTokenType() throws Exception { + final String principal = "root"; + TCredentials tcreds = new TCredentials(principal, PasswordToken.class.getName(), ByteBuffer.allocate(0), UUID.randomUUID().toString()); + UGIAssumingProcessor.principal.set(principal); + proxy.updateArgs(new Object[] {new Object(), tcreds}); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java new file mode 100644 index 0000000..f3f1bdd --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.rpc; + +import static org.junit.Assert.assertEquals; + +import org.apache.accumulo.core.conf.Property; +import org.junit.Test; + +public class ThriftServerTypeTest { + + @Test + public void testDefaultServer() { + assertEquals(ThriftServerType.CUSTOM_HS_HA, ThriftServerType.get(Property.GENERAL_RPC_SERVER_TYPE.getDefaultValue())); + } + + @Test + public void testSpecialServer() { + assertEquals(ThriftServerType.THREADPOOL, ThriftServerType.get("threadpool")); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index c380eb7..7efabb6 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -91,7 +91,9 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.rpc.RpcWrapper; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.util.Halt; @@ -707,14 +709,21 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa } private HostAndPort startStatsService() throws UnknownHostException { - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(this)); + Iface rpcProxy = RpcWrapper.service(this); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass()); + processor = new Processor<Iface>(tcProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } int port = getConfiguration().getPort(Property.GC_PORT); long maxMessageSize = getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); log.debug("Starting garbage collector listening on " + result); try { - return TServerUtils.startTServer(getConfiguration(), result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, getConfiguration() - .getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), 0).address; + return TServerUtils.startTServer(getConfiguration(), result, getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, + getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), getServerSaslParams(), 0).address; } catch (Exception ex) { log.fatal(ex, ex); throw new RuntimeException(ex); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index f98721f..1d7f90f 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -41,6 +42,9 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -59,6 +63,8 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -90,14 +96,36 @@ public class GarbageCollectWriteAheadLogsTest { @Before public void setUp() throws Exception { + SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); instance = createMock(Instance.class); expect(instance.getInstanceID()).andReturn("mock").anyTimes(); - systemConfig = createMock(AccumuloConfiguration.class); + expect(instance.getZooKeepers()).andReturn("localhost").anyTimes(); + expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes(); + systemConfig = new ConfigurationCopy(new HashMap<String,String>()); volMgr = createMock(VolumeManager.class); ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class); expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes(); expect(factory.getInstance()).andReturn(instance).anyTimes(); - replay(instance, factory); + expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); + + // Just make the SiteConfiguration delegate to our AccumuloConfiguration + // Presently, we only need get(Property) and iterator(). + EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() { + @Override + public String answer() { + Object[] args = EasyMock.getCurrentArguments(); + return systemConfig.get((Property) args[0]); + } + }).anyTimes(); + + EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() { + @Override + public Iterator<Entry<String,String>> answer() { + return systemConfig.iterator(); + } + }).anyTimes(); + + replay(instance, factory, siteConfig); AccumuloServerContext context = new AccumuloServerContext(factory); gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false); modTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java index 99558b8..6fcdd37 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java @@ -29,10 +29,15 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.trace.thrift.TInfo; @@ -42,6 +47,7 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.hadoop.fs.Path; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; @@ -51,20 +57,42 @@ public class SimpleGarbageCollectorTest { private Credentials credentials; private Opts opts; private SimpleGarbageCollector gc; - private AccumuloConfiguration systemConfig; + private ConfigurationCopy systemConfig; @Before public void setUp() { volMgr = createMock(VolumeManager.class); instance = createMock(Instance.class); + SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); expect(instance.getInstanceID()).andReturn("mock").anyTimes(); + expect(instance.getZooKeepers()).andReturn("localhost").anyTimes(); + expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes(); opts = new Opts(); - systemConfig = mockSystemConfig(); + systemConfig = createSystemConfig(); ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class); expect(factory.getInstance()).andReturn(instance).anyTimes(); - expect(factory.getConfiguration()).andReturn(mockSystemConfig()).anyTimes(); - replay(instance, factory); + expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes(); + expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); + + // Just make the SiteConfiguration delegate to our AccumuloConfiguration + // Presently, we only need get(Property) and iterator(). + EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() { + @Override + public String answer() { + Object[] args = EasyMock.getCurrentArguments(); + return systemConfig.get((Property) args[0]); + } + }).anyTimes(); + + EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() { + @Override + public Iterator<Entry<String,String>> answer() { + return systemConfig.iterator(); + } + }).anyTimes(); + + replay(instance, factory, siteConfig); credentials = SystemCredentials.get(instance); gc = new SimpleGarbageCollector(opts, volMgr, factory); @@ -76,26 +104,20 @@ public class SimpleGarbageCollectorTest { assertNotNull(gc.getStatus(createMock(TInfo.class), createMock(TCredentials.class))); } - private AccumuloConfiguration mockSystemConfig() { - AccumuloConfiguration systemConfig = createMock(AccumuloConfiguration.class); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L); - expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2).times(2); - expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(false); - expect(systemConfig.getBoolean(Property.GC_FILE_ARCHIVE)).andReturn(false); - replay(systemConfig); - return systemConfig; + private ConfigurationCopy createSystemConfig() { + Map<String,String> conf = new HashMap<>(); + conf.put(Property.INSTANCE_RPC_SASL_ENABLED.getKey(), "false"); + conf.put(Property.GC_CYCLE_START.getKey(), "1"); + conf.put(Property.GC_CYCLE_DELAY.getKey(), "20"); + conf.put(Property.GC_DELETE_THREADS.getKey(), "2"); + conf.put(Property.GC_TRASH_IGNORE.getKey(), "false"); + conf.put(Property.GC_FILE_ARCHIVE.getKey(), "false"); + + return new ConfigurationCopy(conf); } @Test public void testInit() throws Exception { - EasyMock.reset(systemConfig); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L).times(2); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L); - expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2).times(2); - expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(false); - replay(systemConfig); assertSame(volMgr, gc.getVolumeManager()); assertSame(instance, gc.getInstance()); assertEquals(credentials, gc.getCredentials()); @@ -124,13 +146,7 @@ public class SimpleGarbageCollectorTest { @Test public void testMoveToTrash_NotUsingTrash() throws Exception { - AccumuloConfiguration systemConfig = createMock(AccumuloConfiguration.class); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_START)).andReturn(1000L); - expect(systemConfig.getTimeInMillis(Property.GC_CYCLE_DELAY)).andReturn(20000L); - expect(systemConfig.getCount(Property.GC_DELETE_THREADS)).andReturn(2); - expect(systemConfig.getBoolean(Property.GC_FILE_ARCHIVE)).andReturn(false); - expect(systemConfig.getBoolean(Property.GC_TRASH_IGNORE)).andReturn(true); - replay(systemConfig); + systemConfig.set(Property.GC_TRASH_IGNORE.getKey(), "true"); Path path = createMock(Path.class); assertFalse(gc.archiveOrMoveToTrash(path)); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index cad1e01..120692a 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@ -25,6 +25,7 @@ import static org.easymock.EasyMock.verify; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -41,6 +42,9 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; @@ -84,12 +88,34 @@ public class CloseWriteAheadLogReferencesTest { @Before public void setup() { inst = createMock(Instance.class); + SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); expect(inst.getInstanceID()).andReturn(testName.getMethodName()).anyTimes(); - AccumuloConfiguration systemConf = createMock(AccumuloConfiguration.class); + expect(inst.getZooKeepers()).andReturn("localhost").anyTimes(); + expect(inst.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes(); + final AccumuloConfiguration systemConf = new ConfigurationCopy(new HashMap<String,String>()); ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class); expect(factory.getConfiguration()).andReturn(systemConf).anyTimes(); expect(factory.getInstance()).andReturn(inst).anyTimes(); - replay(inst, factory); + expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); + + // Just make the SiteConfiguration delegate to our AccumuloConfiguration + // Presently, we only need get(Property) and iterator(). + EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer<String>() { + @Override + public String answer() { + Object[] args = EasyMock.getCurrentArguments(); + return systemConf.get((Property) args[0]); + } + }).anyTimes(); + + EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer<Iterator<Entry<String,String>>>() { + @Override + public Iterator<Entry<String,String>> answer() { + return systemConf.iterator(); + } + }).anyTimes(); + + replay(inst, factory, siteConfig); refs = new CloseWriteAheadLogReferences(new AccumuloServerContext(factory)); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 12195fa..a6ea6ea 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -118,7 +118,9 @@ import org.apache.accumulo.server.metrics.Metrics; import org.apache.accumulo.server.replication.ZooKeeperInitialization; import org.apache.accumulo.server.rpc.RpcWrapper; import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.security.SecurityUtil; @@ -1090,7 +1092,14 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot); clientHandler = new MasterClientServiceHandler(this); - Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(clientHandler)); + Iface rpcProxy = RpcWrapper.service(clientHandler); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass()); + processor = new Processor<Iface>(tcredsProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } ServerAddress sa = TServerUtils.startServer(this, hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); clientService = sa.server; http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java index 580852d..e8dacaf 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java @@ -221,6 +221,8 @@ public class CompactRange extends MasterRepo { if (iterators.size() > 0 || !compactionStrategy.equals(CompactionStrategyConfigUtil.DEFAULT_STRATEGY)) { this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy)); + } else { + log.info("No iterators or compaction strategy"); } if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0) @@ -256,6 +258,9 @@ public class CompactRange extends MasterRepo { if (tokens[i].startsWith(txidString)) continue; // skip self + log.debug("txidString : " + txidString); + log.debug("tokens[" + i + "] : " + tokens[i]); + throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Another compaction with iterators and/or a compaction strategy is running"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java index 2d98fed..1a098c2 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/Basic.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.servlets.BasicServlet; import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.tracer.TraceFormatter; abstract class Basic extends BasicServlet { @@ -88,6 +89,10 @@ abstract class Basic extends BasicServlet { at = token; } + if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + principal = SecurityUtil.getServerPrincipal(principal); + } + String table = conf.get(Property.TRACE_TABLE); try { Connector conn = HdfsZooInstance.getInstance().getConnector(principal, at); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 3063cdc..f855d9c 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@ -176,10 +176,16 @@ public class TraceServer implements Watcher { Connector connector = null; while (true) { try { + final boolean isDefaultTokenType = conf.get(Property.TRACE_TOKEN_TYPE).equals(Property.TRACE_TOKEN_TYPE.getDefaultValue()); String principal = conf.get(Property.TRACE_USER); + if (conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + // Make sure that we replace _HOST if it exists in the principal + principal = SecurityUtil.getServerPrincipal(principal); + } AuthenticationToken at; Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX); - if (loginMap.isEmpty()) { + if (loginMap.isEmpty() && isDefaultTokenType) { + // Assume the old type of user/password specification Property p = Property.TRACE_PASSWORD; at = new PasswordToken(conf.get(p).getBytes(UTF_8)); } else { http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 2bfa5a0..b08340f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -172,7 +172,9 @@ import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.replication.ZooKeeperInitialization; import org.apache.accumulo.server.rpc.RpcWrapper; import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.security.AuditedSecurityOperation; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.security.SecurityUtil; @@ -315,7 +317,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { Instance instance = getInstance(); this.sessionManager = new SessionManager(aconf); this.logSorter = new LogSorter(instance, fs, aconf); - this.replWorker = new ReplicationWorker(instance, fs, aconf); + this.replWorker = new ReplicationWorker(this, fs); this.statsKeeper = new TabletStatsKeeper(); SimpleTimer.getInstance(aconf).schedule(new Runnable() { @Override @@ -2272,8 +2274,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - Iface tch = RpcWrapper.service(new ThriftClientHandler()); - Processor<Iface> processor = new Processor<Iface>(tch); + Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler()); + final Processor<Iface> processor; + if (ThriftServerType.SASL == getThriftServerType()) { + Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class); + processor = new Processor<Iface>(tcredProxy); + } else { + processor = new Processor<Iface>(rpcProxy); + } HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(), clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); log.info("address = " + address); @@ -2281,7 +2289,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } private HostAndPort startReplicationService() throws UnknownHostException { - ReplicationServicer.Iface repl = RpcWrapper.service(new ReplicationServicerHandler(this)); + final ReplicationServicerHandler handler = new ReplicationServicerHandler(this); + ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler); + ReplicationServicer.Iface repl = TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass()); ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration(); Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java index bd6bcd3..de99029 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java @@ -19,13 +19,14 @@ package org.apache.accumulo.tserver.replication; import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -39,13 +40,15 @@ public class ReplicationWorker implements Runnable { private Instance inst; private VolumeManager fs; + private Credentials creds; private AccumuloConfiguration conf; private ThreadPoolExecutor executor; - public ReplicationWorker(Instance inst, VolumeManager fs, AccumuloConfiguration conf) { - this.inst = inst; + public ReplicationWorker(ClientContext clientCtx, VolumeManager fs) { + this.inst = clientCtx.getInstance(); this.fs = fs; - this.conf = conf; + this.conf = clientCtx.getConfiguration(); + this.creds = clientCtx.getCredentials(); } public void setExecutor(ThreadPoolExecutor executor) { @@ -69,7 +72,7 @@ public class ReplicationWorker implements Runnable { workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_WORK_QUEUE, conf); } - workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get(inst)), executor); + workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, creds), executor); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/main/java/org/apache/accumulo/shell/Shell.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java index d897fc3..a64ff45 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java +++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java @@ -199,7 +199,6 @@ public class Shell extends ShellOptions { protected Instance instance; private Connector connector; protected ConsoleReader reader; - private String principal; private AuthenticationToken token; private final Class<? extends Formatter> defaultFormatterClass = DefaultFormatter.class; private final Class<? extends Formatter> binaryFormatterClass = BinaryFormatter.class; @@ -275,8 +274,22 @@ public class Shell extends ShellOptions { authTimeout = TimeUnit.MINUTES.toNanos(options.getAuthTimeout()); disableAuthTimeout = options.isAuthTimeoutDisabled(); + ClientConfiguration clientConf; + try { + clientConf = options.getClientConfiguration(); + } catch (Exception e) { + printException(e); + return true; + } + // get the options that were parsed - String user = options.getUsername(); + final String user; + try { + user = options.getUsername(); + } catch (Exception e) { + printException(e); + return true; + } String password = options.getPassword(); tabCompletion = !options.isTabCompletionDisabled(); @@ -285,7 +298,13 @@ public class Shell extends ShellOptions { setInstance(options); // AuthenticationToken options - token = options.getAuthenticationToken(); + try { + token = options.getAuthenticationToken(); + } catch (Exception e) { + printException(e); + return true; + } + Map<String,String> loginOptions = options.getTokenProperties(); // process default parameters if unspecified @@ -328,12 +347,11 @@ public class Shell extends ShellOptions { } if (!options.isFake()) { - DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", options.getClientConfiguration()); + DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", clientConf); } this.setTableName(""); - this.principal = user; - connector = instance.getConnector(this.principal, token); + connector = instance.getConnector(user, token); } catch (Exception e) { printException(e); @@ -1157,12 +1175,11 @@ public class Shell extends ShellOptions { public void updateUser(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { connector = instance.getConnector(principal, token); - this.principal = principal; this.token = token; } public String getPrincipal() { - return principal; + return connector.whoami(); } public AuthenticationToken getToken() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java index 875367d..be53d5d 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java +++ b/shell/src/main/java/org/apache/accumulo/shell/ShellOptionsJC.java @@ -27,8 +27,10 @@ import java.util.TreeMap; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +41,10 @@ import com.beust.jcommander.ParameterException; import com.beust.jcommander.converters.FileConverter; public class ShellOptionsJC { - private static final Logger log = LoggerFactory.getLogger(Shell.class); + private static final Logger log = LoggerFactory.getLogger(ShellOptionsJC.class); @Parameter(names = {"-u", "--user"}, description = "username (defaults to your OS user)") - private String username = System.getProperty("user.name", "root"); + private String username = null; public static class PasswordConverter implements IStringConverter<String> { public static final String STDIN = "stdin"; @@ -126,7 +128,7 @@ public class ShellOptionsJC { return Class.forName(value).asSubclass(AuthenticationToken.class).newInstance(); } catch (Exception e) { // Catching ClassNotFoundException, ClassCastException, InstantiationException and IllegalAccessException - log.error("Could not instantiate AuthenticationToken " + value, e); + log.error("Could not instantiate AuthenticationToken {}", value, e); throw new ParameterException(e); } } @@ -169,6 +171,9 @@ public class ShellOptionsJC { @Parameter(names = {"--ssl"}, description = "use ssl to connect to accumulo") private boolean useSsl = false; + @Parameter(names = "--sasl", description = "use SASL to connect to Accumulo (Kerberos)") + private boolean useSasl = false; + @Parameter(names = "--config-file", description = "read the given client config file. " + "If omitted, the path searched can be specified with $ACCUMULO_CLIENT_CONF_PATH, " + "which defaults to ~/.accumulo/config:$ACCUMULO_CONF_DIR/client.conf:/etc/accumulo/client.conf") @@ -189,7 +194,19 @@ public class ShellOptionsJC { @Parameter(hidden = true) private List<String> unrecognizedOptions; - public String getUsername() { + public String getUsername() throws Exception { + if (null == username) { + final ClientConfiguration clientConf = getClientConfiguration(); + if (Boolean.parseBoolean(clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED))) { + if (!UserGroupInformation.isSecurityEnabled()) { + throw new RuntimeException("Kerberos security is not enabled"); + } + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + username = ugi.getUserName(); + } else { + username = System.getProperty("user.name", "root"); + } + } return username; } @@ -197,7 +214,15 @@ public class ShellOptionsJC { return password; } - public AuthenticationToken getAuthenticationToken() { + public AuthenticationToken getAuthenticationToken() throws Exception { + if (null == authenticationToken) { + final ClientConfiguration clientConf = getClientConfiguration(); + // Automatically use a KerberosToken if the client conf is configured for SASL + final boolean saslEnabled = Boolean.parseBoolean(clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED)); + if (saslEnabled) { + authenticationToken = new KerberosToken(); + } + } return authenticationToken; } @@ -275,7 +300,13 @@ public class ShellOptionsJC { if (useSsl()) { clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SSL_ENABLED, "true"); } + if (useSasl()) { + clientConfig.setProperty(ClientProperty.INSTANCE_RPC_SASL_ENABLED, "true"); + } return clientConfig; } + public boolean useSasl() { + return useSasl; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java ---------------------------------------------------------------------- diff --git a/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java b/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java new file mode 100644 index 0000000..0c4e4c7 --- /dev/null +++ b/shell/src/test/java/org/apache/accumulo/shell/ShellOptionsJCTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.shell; + +import static org.junit.Assert.assertEquals; + +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.junit.Before; +import org.junit.Test; + +import com.beust.jcommander.JCommander; + +/** + * + */ +public class ShellOptionsJCTest { + + ShellOptionsJC options; + + @Before + public void setup() { + options = new ShellOptionsJC(); + } + + @Test + public void testSasl() throws Exception { + JCommander jc = new JCommander(); + + jc.setProgramName("accumulo shell"); + jc.addObject(options); + jc.parse(new String[] {"--sasl"}); + ClientConfiguration clientConf = options.getClientConfiguration(); + assertEquals("true", clientConf.get(ClientProperty.INSTANCE_RPC_SASL_ENABLED)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/pom.xml ---------------------------------------------------------------------- diff --git a/test/pom.xml b/test/pom.xml index 16f4125..b58df3c 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -156,6 +156,21 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <!-- Specifically depend on this version of minikdc to avoid having + to increase out normal hadoop dependency --> + <version>2.3.0</version> + <scope>test</scope> + <exclusions> + <!-- Pulls in an older bouncycastle version --> + <exclusion> + <groupId>bouncycastle</groupId> + <artifactId>bcprov-jdk15</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <scope>test</scope> @@ -202,6 +217,7 @@ <timeout.factor>${timeout.factor}</timeout.factor> <org.apache.accumulo.test.functional.useCredProviderForIT>${useCredProviderForIT}</org.apache.accumulo.test.functional.useCredProviderForIT> <org.apache.accumulo.test.functional.useSslForIT>${useSslForIT}</org.apache.accumulo.test.functional.useSslForIT> + <org.apache.accumulo.test.functional.useKrbForIT>${useKrbForIT}</org.apache.accumulo.test.functional.useKrbForIT> </systemPropertyVariables> </configuration> </plugin> @@ -212,6 +228,7 @@ <systemPropertyVariables> <org.apache.accumulo.test.functional.useCredProviderForIT>${useCredProviderForIT}</org.apache.accumulo.test.functional.useCredProviderForIT> <org.apache.accumulo.test.functional.useSslForIT>${useSslForIT}</org.apache.accumulo.test.functional.useSslForIT> + <org.apache.accumulo.test.functional.useKrbForIT>${useKrbForIT}</org.apache.accumulo.test.functional.useKrbForIT> </systemPropertyVariables> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 3bb44ff..0b047cb 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -42,6 +42,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; @@ -102,8 +103,8 @@ public class ZombieTServer { TransactionWatcher watcher = new TransactionWatcher(); final ThriftClientHandler tch = new ThriftClientHandler(context, watcher); Processor<Iface> processor = new Processor<Iface>(tch); - ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", - "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1); + ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), ThriftServerType.CUSTOM_HS_HA, + processor, "ZombieTServer", "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1); String addressString = serverPort.address.toString(); String zPath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addressString; http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 0afa243..b429607 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -70,6 +70,7 @@ import org.apache.accumulo.server.master.state.MetaDataTableScanner; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletLocationState; import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; @@ -254,8 +255,8 @@ public class NullTserver { TransactionWatcher watcher = new TransactionWatcher(); ThriftClientHandler tch = new ThriftClientHandler(new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())), watcher); Processor<Iface> processor = new Processor<Iface>(tch); - TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000, - 10 * 1024 * 1024, null, -1); + TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), ThriftServerType.CUSTOM_HS_HA, processor, "NullTServer", + "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); http://git-wip-us.apache.org/repos/asf/accumulo/blob/4f19aa1f/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java index 8f7e1b7..c1ad17b 100644 --- a/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java +++ b/test/src/test/java/org/apache/accumulo/harness/AccumuloClusterIT.java @@ -34,7 +34,9 @@ import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; @@ -48,6 +50,7 @@ import com.google.common.base.Preconditions; */ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniClusterConfigurationCallback { private static final Logger log = LoggerFactory.getLogger(AccumuloClusterIT.class); + private static final String TRUE = Boolean.toString(true); public static enum ClusterType { MINI, STANDALONE; @@ -62,15 +65,68 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste protected static AccumuloCluster cluster; protected static ClusterType type; protected static AccumuloClusterPropertyConfiguration clusterConf; + protected static TestingKdc krb; @BeforeClass public static void setUp() throws Exception { clusterConf = AccumuloClusterPropertyConfiguration.get(); type = clusterConf.getClusterType(); + if (ClusterType.MINI == type && TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION))) { + krb = new TestingKdc(); + krb.start(); + } + initialized = true; } + @AfterClass + public static void tearDownKdc() throws Exception { + if (null != krb) { + krb.stop(); + } + } + + /** + * {@link TestingKdc#getAccumuloKeytab()} + */ + public static File getAccumuloKeytab() { + if (null == krb) { + throw new RuntimeException("KDC not enabled"); + } + return krb.getAccumuloKeytab(); + } + + /** + * {@link TestingKdc#getAccumuloPrincipal()} + */ + public static String getAccumuloPrincipal() { + if (null == krb) { + throw new RuntimeException("KDC not enabled"); + } + return krb.getAccumuloPrincipal(); + } + + /** + * {@link TestingKdc#getClientKeytab()} + */ + public static File getClientKeytab() { + if (null == krb) { + throw new RuntimeException("KDC not enabled"); + } + return krb.getClientKeytab(); + } + + /** + * {@link TestingKdc#getClientPrincipal()} + */ + public static String getClientPrincipal() { + if (null == krb) { + throw new RuntimeException("KDC not enabled"); + } + return krb.getClientPrincipal(); + } + @Before public void setupCluster() throws Exception { // Before we try to instantiate the cluster, check to see if the test even wants to run against this type of cluster @@ -80,7 +136,7 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste case MINI: MiniClusterHarness miniClusterHarness = new MiniClusterHarness(); // Intrinsically performs the callback to let tests alter MiniAccumuloConfig and core-site.xml - cluster = miniClusterHarness.create(this, getToken()); + cluster = miniClusterHarness.create(this, getToken(), krb); break; case STANDALONE: StandaloneAccumuloClusterConfiguration conf = (StandaloneAccumuloClusterConfiguration) clusterConf; @@ -98,6 +154,10 @@ public abstract class AccumuloClusterIT extends AccumuloIT implements MiniCluste if (type.isDynamic()) { cluster.start(); + if (null != krb) { + // Log in the 'client' user + UserGroupInformation.loginUserFromKeytab(getClientPrincipal(), getClientKeytab().getAbsolutePath()); + } } else { log.info("Removing tables which appear to be from a previous test run"); cleanupTables();
