This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b38a2dbf65 Added ScanServer property for allowed tables (#6146)
b38a2dbf65 is described below
commit b38a2dbf656e83589eaffa64c89eafa47d242c2d
Author: Dave Marion <[email protected]>
AuthorDate: Wed Feb 25 07:33:49 2026 -0500
Added ScanServer property for allowed tables (#6146)
Added property that allows user to configure which tables
are allowed to be scanned by clients at the ScanServer
group level. Property defaults to allowing all non-accumulo
namespace tables.
Closes #6123
---
.../org/apache/accumulo/core/conf/Property.java | 7 +
.../org/apache/accumulo/tserver/ScanServer.java | 133 +++++++++-
.../accumulo/tserver/ThriftScanClientHandler.java | 2 +-
.../apache/accumulo/tserver/ScanServerTest.java | 81 ++++--
.../accumulo/test/ScanServerAllowedTablesIT.java | 292 +++++++++++++++++++++
5 files changed, 480 insertions(+), 35 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 17edff8d2e..90602ad616 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -561,6 +561,13 @@ public enum Property {
"The amount of time a scan reference is unused before its deleted from
metadata table.",
"2.1.0"),
@Experimental
+ SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables.group.", null,
PropertyType.PREFIX,
+ "A regular expression that determines which tables are allowed to be
scanned for"
+ + " servers in the specified group. The property name should end
with the scan server"
+ + " group and the property value should take into account the table
namespace and name."
+ + " The default value disallows scans on tables in the accumulo
namespace.",
+ "2.1.5"),
+ @Experimental
SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s",
PropertyType.TIMEDURATION,
"The time between adjustments of the thrift server thread pool.",
"2.1.0"),
// properties that are specific to tablet server behavior
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 83f57347b1..53b121f08d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL;
@@ -45,6 +46,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -54,6 +58,7 @@ import
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.cluster.ClusterConfigParser;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
@@ -88,6 +93,7 @@ import
org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.AbstractServer;
@@ -111,6 +117,7 @@ import
org.apache.accumulo.tserver.session.SingleScanSession;
import org.apache.accumulo.tserver.tablet.SnapshotTablet;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.zookeeper.KeeperException;
@@ -177,6 +184,8 @@ public class ScanServer extends AbstractServer
}
private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+ // Default pattern to allow scans on all tables not in accumulo namespace
+ private static final String DEFAULT_SCAN_ALLOWED_PATTERN =
"^(?!accumulo\\.).*$";
protected ThriftScanClientHandler delegate;
private UUID serverLockUUID;
@@ -213,6 +222,9 @@ public class ScanServer extends AbstractServer
private final String groupName;
+ private final ConcurrentHashMap<TableId,Boolean> allowedTables = new
ConcurrentHashMap<>();
+ private volatile String currentAllowedTableRegex;
+
public ScanServer(ScanServerOpts opts, String[] args) {
super("sserver", opts, args);
@@ -388,6 +400,7 @@ public class ScanServer extends AbstractServer
}
SecurityUtil.serverLogin(getConfiguration());
+ updateAllowedTables(false);
ServerAddress address = null;
try {
@@ -423,6 +436,7 @@ public class ScanServer extends AbstractServer
Thread.sleep(1000);
updateIdleStatus(sessionManager.getActiveScans().isEmpty()
&& tabletMetadataCache.estimatedSize() == 0);
+ updateAllowedTables(false);
} catch (InterruptedException e) {
LOG.info("Interrupt Exception received, shutting down");
gracefulShutdown(getContext().rpcCreds());
@@ -477,6 +491,106 @@ public class ScanServer extends AbstractServer
}
}
+ // Visible for testing
+ protected boolean isAllowed(TCredentials credentials, TableId tid)
+ throws ThriftSecurityException {
+ Boolean result = allowedTables.get(tid);
+ if (result == null) {
+
+ final Retry retry =
+ Retry.builder().maxRetries(10).retryAfter(1, SECONDS).incrementBy(0,
SECONDS)
+ .maxWait(2, SECONDS).backOffFactor(1.0).logInterval(3,
SECONDS).createRetry();
+
+ while (result == null && retry.canRetry()) {
+ try {
+ retry.waitForNextAttempt(LOG,
+ "Allowed tables mapping does not contain an entry for table: " +
tid
+ + ", refreshing table...");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while waiting for next retry", e);
+ break;
+ }
+ // Clear the cache and try again, maybe there
+ // is a race condition in table creation and scan
+ updateAllowedTables(true);
+ // validate that the table exists, else throw
+ delegate.getNamespaceId(credentials, tid);
+ result = allowedTables.get(tid);
+ retry.useRetry();
+ }
+
+ if (result == null) {
+ // Ran out of retries
+ throw new IllegalStateException(
+ "Unable to get allowed table mapping for table: " + tid + " within
10s");
+ }
+ }
+ return result;
+ }
+
+ private synchronized void updateAllowedTables(boolean clearCache) {
+
+ LOG.trace("Updating allowed tables for ScanServer");
+ if (clearCache) {
+ context.clearTableListCache();
+ }
+
+ // Remove tables that no longer exist
+ allowedTables.keySet().forEach(tid -> {
+ if (!getContext().getTableIdToNameMap().containsKey(tid)) {
+ LOG.trace("Removing table {} from allowed table map as it no longer
exists", tid);
+ allowedTables.remove(tid);
+ }
+ });
+
+ final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() +
groupName;
+ String allowedTableRegex = getConfiguration().get(propName);
+ if (allowedTableRegex == null) {
+ allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN;
+ }
+
+ if (currentAllowedTableRegex == null) {
+ LOG.trace("Property {} initial value: {}", propName, allowedTableRegex);
+ } else if (currentAllowedTableRegex.equals(allowedTableRegex)) {
+ // Property value has not changed, do nothing
+ } else {
+ LOG.info("Property {} has changed. Old value: {}, new value: {}",
propName,
+ currentAllowedTableRegex, allowedTableRegex);
+ }
+
+ Pattern allowedTablePattern;
+ try {
+ allowedTablePattern = Pattern.compile(allowedTableRegex);
+ // Regex is valid, store it
+ currentAllowedTableRegex = allowedTableRegex;
+ } catch (PatternSyntaxException e) {
+ LOG.error(
+ "Property {} contains an invalid regular expression. Property value:
{}. Disabling all tables.",
+ propName, allowedTableRegex);
+ allowedTablePattern = null;
+ }
+
+ Pattern p = allowedTablePattern;
+ context.getTableNameToIdMap().entrySet().forEach(e -> {
+ String tname = e.getKey();
+ TableId tid = e.getValue();
+ if (p == null) {
+ allowedTables.put(tid, Boolean.FALSE);
+ } else {
+ Matcher m = p.matcher(tname);
+ if (m.matches()) {
+ LOG.trace("Table {} can now be scanned via this ScanServer", tname);
+ allowedTables.put(tid, Boolean.TRUE);
+ } else {
+ LOG.trace("Table {} cannot be scanned via this ScanServer", tname);
+ allowedTables.put(tid, Boolean.FALSE);
+ }
+ }
+ });
+
+ }
+
@SuppressWarnings("unchecked")
private Map<KeyExtent,TabletMetadata>
getTabletMetadata(Collection<KeyExtent> extents) {
if (tabletMetadataCache == null) {
@@ -945,11 +1059,6 @@ public class ScanServer extends AbstractServer
};
}
- /* Exposed for testing */
- protected boolean isSystemUser(TCredentials creds) {
- return context.getSecurityOperation().isSystemUser(creds);
- }
-
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials,
TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize, List<IterInfo>
ssiList,
@@ -966,9 +1075,10 @@ public class ScanServer extends AbstractServer
KeyExtent extent = getKeyExtent(textent);
- if (extent.isMeta() && !isSystemUser(credentials)) {
- throw new TException(
- "Only the system user can perform eventual consistency scans on the
root and metadata tables");
+ if (!isAllowed(credentials, extent.tableId())) {
+ throw new TApplicationException(TApplicationException.INTERNAL_ERROR,
+ "Scan of table " + extent.tableId() + " disallowed by property: "
+ + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName);
}
try (ScanReservation reservation =
@@ -1038,9 +1148,10 @@ public class ScanServer extends AbstractServer
for (Entry<TKeyExtent,List<TRange>> entry : tbatch.entrySet()) {
KeyExtent extent = getKeyExtent(entry.getKey());
- if (extent.isMeta() &&
!context.getSecurityOperation().isSystemUser(credentials)) {
- throw new TException(
- "Only the system user can perform eventual consistency scans on
the root and metadata tables");
+ if (!isAllowed(credentials, extent.tableId())) {
+ throw new TApplicationException(TApplicationException.INTERNAL_ERROR,
+ "Scan of table " + extent.tableId() + " disallowed by property: "
+ + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() +
this.groupName);
}
batch.put(extent, entry.getValue());
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index b254fd3e40..e0a54c8374 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -109,7 +109,7 @@ public class ThriftScanClientHandler implements
TabletScanClientService.Iface {
.getTimeInMillis(Property.TSERV_SCAN_RESULTS_MAX_TIMEOUT);
}
- private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
+ public NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
throws ThriftSecurityException {
try {
return server.getContext().getNamespaceId(tableId);
diff --git
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
index 5a08040872..968f75e1b2 100644
---
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
+++
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
@@ -24,7 +24,9 @@ import static org.easymock.EasyMock.partialMockBuilder;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -32,8 +34,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
@@ -43,6 +48,7 @@ import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
import org.apache.accumulo.core.dataImpl.thrift.TColumn;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.process.thrift.ServerProcessService;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
@@ -55,6 +61,7 @@ import
org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
import org.apache.accumulo.tserver.tablet.SnapshotTablet;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletBase;
+import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.junit.jupiter.api.Test;
@@ -65,7 +72,7 @@ public class ScanServerTest {
private KeyExtent extent;
private TabletResolver resolver;
private ScanReservation reservation;
- private boolean systemUser;
+ private ConcurrentHashMap<TableId,TableId> allowedTables;
protected TestScanServer(ScanServerOpts opts, String[] args) {
super(opts, args);
@@ -114,13 +121,17 @@ public class ScanServerTest {
}
@Override
- protected boolean isSystemUser(TCredentials creds) {
- return systemUser;
+ public boolean isShutdownRequested() {
+ return false;
}
@Override
- public boolean isShutdownRequested() {
- return false;
+ protected boolean isAllowed(TCredentials credentials, TableId tid) {
+ return allowedTables.containsKey(tid);
+ }
+
+ public void addAllowedTable(TableId tid) {
+ allowedTables.put(tid, tid);
}
}
@@ -147,6 +158,8 @@ public class ScanServerTest {
TabletResolver resolver = createMock(TabletResolver.class);
TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+ TableId tid = TableId.of("42");
+ expect(sextent.tableId()).andReturn(tid).once();
expect(reservation.newTablet(ss, sextent)).andReturn(tablet);
expect(reservation.getFailures()).andReturn(Map.of()).anyTimes();
reservation.close();
@@ -157,14 +170,15 @@ public class ScanServerTest {
expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult());
handler.closeScan(tinfo, 15);
- replay(reservation, handler);
+ replay(reservation, sextent, handler);
+ ss.allowedTables = new ConcurrentHashMap<>();
+ ss.addAllowedTable(tid);
ss.delegate = handler;
ss.extent = sextent;
ss.resolver = resolver;
ss.reservation = reservation;
ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
- ss.systemUser = false;
TKeyExtent textent = createMock(TKeyExtent.class);
InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10,
titer, ssio, auths,
@@ -172,7 +186,7 @@ public class ScanServerTest {
assertEquals(15, is.getScanID());
ss.continueScan(tinfo, is.getScanID(), 0L);
ss.closeScan(tinfo, is.getScanID());
- verify(reservation, handler);
+ verify(reservation, sextent, handler);
}
@Test
@@ -194,18 +208,20 @@ public class ScanServerTest {
Map<String,String> execHints = new HashMap<>();
ScanReservation reservation = createMock(ScanReservation.class);
- expect(extent.isMeta()).andReturn(false).anyTimes();
+ TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+ TableId tid = TableId.of("42");
+ expect(extent.tableId()).andReturn(tid).once();
expect(extent.toThrift()).andReturn(textent).anyTimes();
expect(reservation.getFailures()).andReturn(Map.of(textent, ranges));
reservation.close();
replay(extent, reservation);
- TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
+ ss.allowedTables = new ConcurrentHashMap<>();
+ ss.addAllowedTable(tid);
ss.extent = extent;
ss.delegate = handler;
ss.reservation = reservation;
- ss.systemUser = false;
assertThrows(NotServingTabletException.class, () -> {
ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio,
auths, false, false, 10,
@@ -246,7 +262,8 @@ public class ScanServerTest {
};
TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
- expect(extent.isMeta()).andReturn(false).anyTimes();
+ TableId tid = TableId.of("42");
+ expect(extent.tableId()).andReturn(tid).once();
expect(reservation.newTablet(ss, extent)).andReturn(tablet);
expect(reservation.getTabletMetadataExtents()).andReturn(Set.of(extent));
expect(reservation.getFailures()).andReturn(Map.of());
@@ -259,12 +276,13 @@ public class ScanServerTest {
replay(extent, reservation, handler);
+ ss.allowedTables = new ConcurrentHashMap<>();
+ ss.addAllowedTable(tid);
ss.delegate = handler;
ss.extent = extent;
ss.resolver = resolver;
ss.reservation = reservation;
ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
- ss.systemUser = false;
Map<TKeyExtent,List<TRange>> extents = new HashMap<>();
extents.put(createMock(TKeyExtent.class), ranges);
@@ -309,7 +327,8 @@ public class ScanServerTest {
};
TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
- expect(extent.isMeta()).andReturn(false).anyTimes();
+ TableId tid = TableId.of("42");
+ expect(extent.tableId()).andReturn(tid).once();
expect(reservation.newTablet(ss, extent)).andReturn(tablet).anyTimes();
expect(reservation.getTabletMetadataExtents()).andReturn(Set.of());
expect(reservation.getFailures()).andReturn(Map.of(textent,
ranges)).anyTimes();
@@ -321,12 +340,13 @@ public class ScanServerTest {
replay(extent, reservation, handler);
+ ss.allowedTables = new ConcurrentHashMap<>();
+ ss.addAllowedTable(tid);
ss.delegate = handler;
ss.extent = extent;
ss.resolver = resolver;
ss.reservation = reservation;
ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
- ss.systemUser = false;
Map<TKeyExtent,List<TRange>> extents = new HashMap<>();
extents.put(textent, ranges);
@@ -370,7 +390,6 @@ public class ScanServerTest {
ss.delegate = handler;
ss.resolver = resolver;
ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
- ss.systemUser = false;
assertThrows(TException.class, () -> {
ss.startMultiScan(tinfo, tcreds, extents, tcols, titer, ssio, auths,
false, tsc, 30L,
@@ -380,7 +399,7 @@ public class ScanServerTest {
}
@Test
- public void testScanMetaTablesSystemUser() throws Exception {
+ public void testScanDefaultAllowedTables() throws Exception {
handler = createMock(ThriftScanClientHandler.class);
TInfo tinfo = createMock(TInfo.class);
@@ -399,7 +418,7 @@ public class ScanServerTest {
TabletResolver resolver = createMock(TabletResolver.class);
TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
- expect(sextent.isMeta()).andReturn(true).anyTimes();
+ expect(sextent.tableId()).andReturn(MetadataTable.ID).once();
expect(reservation.newTablet(ss, sextent)).andReturn(tablet);
expect(reservation.getFailures()).andReturn(Map.of()).anyTimes();
reservation.close();
@@ -412,12 +431,13 @@ public class ScanServerTest {
replay(sextent, reservation, handler);
+ ss.allowedTables = new ConcurrentHashMap<>();
+ ss.addAllowedTable(MetadataTable.ID);
ss.delegate = handler;
ss.extent = sextent;
ss.resolver = resolver;
ss.reservation = reservation;
ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
- ss.systemUser = true;
TKeyExtent textent = createMock(TKeyExtent.class);
InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10,
titer, ssio, auths,
@@ -430,7 +450,7 @@ public class ScanServerTest {
}
@Test
- public void testScanMetaTablesNonSystemUser() throws Exception {
+ public void testScanDisallowedTable() throws Exception {
handler = createMock(ThriftScanClientHandler.class);
TInfo tinfo = createMock(TInfo.class);
@@ -448,25 +468,40 @@ public class ScanServerTest {
TabletResolver resolver = createMock(TabletResolver.class);
TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
- expect(sextent.isMeta()).andReturn(true).anyTimes();
+ expect(sextent.tableId()).andReturn(MetadataTable.ID).anyTimes();
expect(reservation.getFailures()).andReturn(Map.of()).anyTimes();
replay(sextent, reservation, handler);
+ ss.allowedTables = new ConcurrentHashMap<>();
+ ss.addAllowedTable(TableId.of("42"));
ss.delegate = handler;
ss.extent = sextent;
ss.resolver = resolver;
ss.reservation = reservation;
ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234);
- ss.systemUser = false;
TKeyExtent textent = createMock(TKeyExtent.class);
- assertThrows(TException.class, () -> {
+ TException te = assertThrows(TException.class, () -> {
ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio,
auths, false, false, 10,
tsc, 30L, classLoaderContext, execHints, 0L);
});
+ assertTrue(te instanceof TApplicationException);
+ TApplicationException tae = (TApplicationException) te;
+ assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+ assertTrue(tae.getMessage().contains("disallowed by property"));
verify(sextent, reservation, handler);
}
+ @Test
+ public void testTableNameRegex() {
+ String r = "^(?!accumulo\\.).*$";
+ Pattern p = Pattern.compile(r);
+
+ assertFalse(p.matcher("accumulo.root").matches());
+ assertFalse(p.matcher("accumulo.metadata").matches());
+ assertTrue(p.matcher("test.table").matches());
+ }
+
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
new file mode 100644
index 0000000000..e14834cf48
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TApplicationException;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import com.google.common.collect.Iterables;
+
+public class ScanServerAllowedTablesIT extends SharedMiniClusterBase {
+
+ // @formatter:off
+ private static final String clientConfiguration =
+ "["+
+ " {"+
+ " \"isDefault\": true,"+
+ " \"maxBusyTimeout\": \"5m\","+
+ " \"busyTimeoutMultiplier\": 8,"+
+ " \"scanTypeActivations\": [],"+
+ " \"attemptPlans\": ["+
+ " {"+
+ " \"servers\": \"3\","+
+ " \"busyTimeout\": \"33ms\","+
+ " \"salt\": \"one\""+
+ " },"+
+ " {"+
+ " \"servers\": \"13\","+
+ " \"busyTimeout\": \"33ms\","+
+ " \"salt\": \"two\""+
+ " },"+
+ " {"+
+ " \"servers\": \"100%\","+
+ " \"busyTimeout\": \"33ms\""+
+ " }"+
+ " ]"+
+ " },"+
+ " {"+
+ " \"isDefault\": false,"+
+ " \"maxBusyTimeout\": \"5m\","+
+ " \"busyTimeoutMultiplier\": 8,"+
+ " \"group\": \"GROUP1\","+
+ " \"scanTypeActivations\": [\"use_group1\"],"+
+ " \"attemptPlans\": ["+
+ " {"+
+ " \"servers\": \"3\","+
+ " \"busyTimeout\": \"33ms\","+
+ " \"salt\": \"one\""+
+ " },"+
+ " {"+
+ " \"servers\": \"13\","+
+ " \"busyTimeout\": \"33ms\","+
+ " \"salt\": \"two\""+
+ " },"+
+ " {"+
+ " \"servers\": \"100%\","+
+ " \"busyTimeout\": \"33ms\""+
+ " }"+
+ " ]"+
+ " }"+
+ "]";
+ // @formatter:on
+
+ public static class SSATITConfiguration implements
MiniClusterConfigurationCallback {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
+
+ cfg.setNumScanServers(1);
+
+ // allow the ScanServer in the DEFAULT group to only scan tables in
accumulo namespace
+ cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey()
+ + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME,
"^accumulo\\..*$");
+ // allow the ScanServer in the GROUP1 group to only scan tables created
with the prefix 'test'
+ cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1",
"^test.*");
+
+
cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey()
+ "profiles",
+ clientConfiguration);
+ }
+
+ }
+
+ @BeforeAll
+ public static void start() throws Exception {
+ SharedMiniClusterBase.startMiniClusterWithConfig(new
SSATITConfiguration());
+
SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+ "localhost");
+
+ String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+ ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+ String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+ while (zrw.getChildren(scanServerRoot).size() == 0) {
+ Thread.sleep(500);
+ }
+ }
+
+ @AfterAll
+ public static void stop() throws Exception {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ public static enum ScannerType {
+ BATCH_SCANNER, SCANNER;
+ }
+
+ private ScannerBase createScanner(AccumuloClient client, ScannerType stype,
String tableName)
+ throws TableNotFoundException {
+ switch (stype) {
+ case BATCH_SCANNER:
+ BatchScanner batchScanner = client.createBatchScanner(tableName,
Authorizations.EMPTY);
+ batchScanner.setRanges(Set.of(new Range()));
+ return batchScanner;
+ case SCANNER:
+ Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY);
+ scanner.setRange(new Range());
+ return scanner;
+ default:
+ throw new IllegalArgumentException("Unknown scanner type: " + stype);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ @ParameterizedTest
+ @EnumSource(value = ScannerType.class)
+ public void testAllowedTables(ScannerType stype) throws Exception {
+
+ final String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+ final ZooKeeper zk =
getCluster().getServerContext().getZooReaderWriter().getZooKeeper();
+ final String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ // Start the 2nd ScanServer
+ // Bump the number of scan serves that can run to start the GROUP1 scan
server
+ getCluster().getConfig().setNumScanServers(2);
+ getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(),
+ new String[] {"-g", "GROUP1"});
+ Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2);
+ Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream().anyMatch(
+ (p) ->
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) ==
true);
+ Wait.waitFor(() -> ((ClientContext)
client).getScanServers().values().stream()
+ .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true);
+
+ // Create table with test prefix, load some data
+ final String testTableName = "testAllowedTables" + stype.name();
+ final int ingestedEntryCount =
+ ScanServerIT.createTableAndIngest(client, testTableName, null, 10,
10, "colf");
+ assertEquals(100, ingestedEntryCount);
+
+ // Using default ScanServer should succeed, only allowed to scan system
tables
+ try (ScannerBase scanner = createScanner(client, stype,
MetadataTable.NAME)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertTrue(Iterables.size(scanner) > 0);
+ }
+
+ // Using default ScanServer should fail, only allowed to scan system
tables
+ try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ RuntimeException re = assertThrows(RuntimeException.class, () ->
Iterables.size(scanner));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertTrue(root instanceof TApplicationException);
+ TApplicationException tae = (TApplicationException) root;
+ assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+ assertTrue(tae.getMessage().contains("disallowed by property"));
+ }
+
+ // Using GROUP1 ScanServer should fail, only allowed to test tables
+ try (ScannerBase scanner = createScanner(client, stype,
MetadataTable.NAME)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+ RuntimeException re = assertThrows(RuntimeException.class, () ->
Iterables.size(scanner));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertTrue(root instanceof TApplicationException);
+ TApplicationException tae = (TApplicationException) root;
+ assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+ assertTrue(tae.getMessage().contains("disallowed by property"));
+ }
+
+ // Using GROUP1 ScanServer should succeed
+ try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+ assertEquals(100, Iterables.size(scanner));
+ }
+
+ // Change the GROUP1 property so that subsequent test tables don't work
+ getCluster().getServerContext().instanceOperations()
+ .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1",
"^foo.*");
+
+ // Using GROUP1 ScanServer should fail, only allowed to test 'test*'
tables
+ try (ScannerBase scanner = createScanner(client, stype,
MetadataTable.NAME)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+ RuntimeException re = assertThrows(RuntimeException.class, () ->
Iterables.size(scanner));
+ Throwable root = ExceptionUtils.getRootCause(re);
+ assertTrue(root instanceof TApplicationException);
+ TApplicationException tae = (TApplicationException) root;
+ assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType());
+ assertTrue(tae.getMessage().contains("disallowed by property"));
+ }
+
+ // Using GROUP1 ScanServer should fail as the property was changed
+ try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+ // Try multiple times waiting for the server to pick up the property
change
+ Wait.waitFor(() -> {
+ try {
+ var unused = Iterables.size(scanner);
+ return false;
+ } catch (RuntimeException e) {
+ return true;
+ }
+ });
+ }
+
+ // Change the GROUP1 property so that subsequent test tables do work
+ getCluster().getServerContext().instanceOperations()
+ .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1",
"^test.*");
+
+ // Using GROUP1 ScanServer should succeed as the property was changed
back
+ try (ScannerBase scanner = createScanner(client, stype, testTableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+ // Try multiple times waiting for the server to pick up the property
change
+ Wait.waitFor(() -> {
+ try {
+ int size = Iterables.size(scanner);
+ return size == 100;
+ } catch (RuntimeException e) {
+ return false;
+ }
+ });
+
+ }
+
+ }
+
+ }
+
+}