This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch revert-3102-2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit cf0018e5909f3c19d566c7bb06a118d815608351 Author: Dave Marion <[email protected]> AuthorDate: Thu Dec 1 13:41:51 2022 -0500 Revert "Revert "Allow ScanServers to scan offline tables (#3082)" (#3101) (#3102)" This reverts commit 02bd367577b8aa8d8a0bea629fd548ebab04611e. --- .../apache/accumulo/core/clientImpl/ClientContext.java | 7 +++---- .../apache/accumulo/core/clientImpl/ScannerImpl.java | 11 +++++++++++ .../core/clientImpl/TabletServerBatchReader.java | 4 ++++ .../java/org/apache/accumulo/test/ScanServerIT.java | 17 ++++++++--------- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 25eba7ecc4..cdad4d673e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -689,8 +689,8 @@ public class ClientContext implements AccumuloClient { int numQueryThreads) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchReader(this, requireNotOffline(getTableId(tableName), tableName), - tableName, authorizations, numQueryThreads); + return new TabletServerBatchReader(this, getTableId(tableName), tableName, authorizations, + numQueryThreads); } @Override @@ -777,8 +777,7 @@ public class ClientContext implements AccumuloClient { throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - Scanner scanner = - new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations); + Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations); Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties()); if (batchSize != null) { scanner.setBatchSize(batchSize); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java index c418e5a412..5fbacd7f7f 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -152,6 +153,16 @@ public class ScannerImpl extends ScannerOptions implements Scanner { @Override public synchronized Iterator<Entry<Key,Value>> iterator() { ensureOpen(); + + if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { + try { + String tableName = context.getTableName(tableId); + context.requireNotOffline(tableId, tableName); + } catch (TableNotFoundException e) { + throw new RuntimeException("Table not found", e); + } + } + ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size, getTimeout(SECONDS), this, isolated, readaheadThreshold, new Reporter()); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index c0de759516..9508f06aa6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -115,6 +115,10 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan throw new IllegalStateException("batch reader closed"); } + if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { + context.requireNotOffline(tableId, tableName); + } + return new TabletServerBatchReaderIterator(context, tableId, tableName, authorizations, ranges, numThreads, queryThreadPool, this, timeOut); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 528fd7f441..f9557859a9 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -36,7 +36,6 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.ClientProperty; @@ -150,19 +149,19 @@ public class ScanServerIT extends SharedMiniClusterBase { @Test public void testScanOfflineTable() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { String tableName = getUniqueNames(1)[0]; - createTableAndIngest(client, tableName, null, 10, 10, "colf"); + final int ingestedEntryCount = createTableAndIngest(client, tableName, null, 10, 10, "colf"); client.tableOperations().offline(tableName, true); - assertThrows(TableOfflineException.class, () -> { - try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(new Range()); - scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); - assertEquals(100, Iterables.size(scanner)); - } // when the scanner is closed, all open sessions should be closed - }); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed } }
