This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new fc2a2e7dd3 Partially revert 1414641 to resurrect 2.1 upgrade code 
(#3999)
fc2a2e7dd3 is described below

commit fc2a2e7dd32ac52da3e861c2f05b126b3b07d7b0
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Nov 30 15:24:11 2023 -0500

    Partially revert 1414641 to resurrect 2.1 upgrade code (#3999)
---
 .../accumulo/server/AccumuloDataVersion.java       |  12 +-
 .../apache/accumulo/server/ServerContextTest.java  |   2 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   6 +-
 .../accumulo/manager/upgrade/Upgrader10to11.java   | 268 +++++++++++++++++++++
 .../manager/upgrade/Upgrader10to11Test.java        | 192 +++++++++++++++
 5 files changed, 474 insertions(+), 6 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
index a96721988e..9f459ea599 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
@@ -37,7 +37,7 @@ import java.util.Set;
 public class AccumuloDataVersion {
 
   /**
-   * version (12) reflects On-Demand tablets starting with 4.0
+   * version (13) reflects On-Demand tablets starting with 4.0
    */
   public static final int ONDEMAND_TABLETS_FOR_VERSION_4 = 13;
 
@@ -52,11 +52,16 @@ public class AccumuloDataVersion {
    */
   public static final int REMOVE_DEPRECATIONS_FOR_VERSION_3 = 11;
 
+  /**
+   * version (10) reflects changes to how root tablet metadata is serialized 
in zookeeper starting
+   * with 2.1. See {@link 
org.apache.accumulo.core.metadata.schema.RootTabletMetadata}.
+   */
+  public static final int ROOT_TABLET_META_CHANGES = 10;
+
   /**
    * Historic data versions
    *
    * <ul>
-   * <li>version (10) Changes to how root tablet metadata is serialized in 
zookeeper in 2.1.0</li>
    * <li>version (9) RFiles and wal crypto serialization changes. RFile 
summary data in 2.0.0</li>
    * <li>version (8) RFile index (ACCUMULO-1124) and wal tracking in ZK in 
1.8.0</li>
    * <li>version (7) also reflects the addition of a replication table in 1.7.0
@@ -78,6 +83,7 @@ public class AccumuloDataVersion {
   }
 
   // ELASTICITY_TODO get upgrade working
+  // public static final Set<Integer> CAN_RUN = 
Set.of(ROOT_TABLET_META_CHANGES, CURRENT_VERSION);
   public static final Set<Integer> CAN_RUN = Set.of(CURRENT_VERSION);
 
   /**
@@ -99,6 +105,8 @@ public class AccumuloDataVersion {
 
   private static String dataVersionToReleaseName(final int version) {
     switch (version) {
+      case ROOT_TABLET_META_CHANGES:
+        return "2.1.0";
       case REMOVE_DEPRECATIONS_FOR_VERSION_3:
         return "3.0.0";
       case METADATA_FILE_JSON_ENCODING:
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java 
b/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java
index 7694b34166..f86127ea9b 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java
@@ -138,7 +138,7 @@ public class ServerContextTest {
 
     final int oldestSupported = 
AccumuloDataVersion.ONDEMAND_TABLETS_FOR_VERSION_4;
     // ELASTICITY_TODO basically disable check until upgrade to 3.1 is 
supported. Should be:
-    // final int oldestSupported = 
AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
+    // final int oldestSupported = 
AccumuloDataVersion.ROOT_TABLET_META_CHANGES;
 
     final int currentVersion = AccumuloDataVersion.get();
     IntConsumer shouldPass = ServerContext::ensureDataVersionCompatible;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index dedda83872..94a81eaaf9 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.accumulo.manager.upgrade;
 
-import static 
org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3;
+import static 
org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -112,8 +112,8 @@ public class UpgradeCoordinator {
   private int currentVersion;
   // map of "current version" -> upgrader to next version.
   // Sorted so upgrades execute in order from the oldest supported data 
version to current
-  private final Map<Integer,Upgrader> upgraders = Collections.unmodifiableMap(
-      new TreeMap<>(Map.of(REMOVE_DEPRECATIONS_FOR_VERSION_3, new 
Upgrader11to12())));
+  private final Map<Integer,Upgrader> upgraders = Collections
+      .unmodifiableMap(new TreeMap<>(Map.of(ROOT_TABLET_META_CHANGES, new 
Upgrader10to11())));
 
   private volatile UpgradeStatus status;
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader10to11.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader10to11.java
new file mode 100644
index 0000000000..33d35e1d41
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader10to11.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.upgrade;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_STATE;
+import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX;
+import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreKey;
+import org.apache.accumulo.server.conf.store.TablePropKey;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Upgrader10to11 implements Upgrader {
+
+  private static final Logger log = 
LoggerFactory.getLogger(Upgrader10to11.class);
+
+  // Included for upgrade code usage any other usage post 3.0 should not be 
used.
+  private static final TableId REPLICATION_ID = TableId.of("+rep");
+
+  private static final Range REP_TABLE_RANGE =
+      new Range(REPLICATION_ID.canonical() + ";", true, 
REPLICATION_ID.canonical() + "<", true);
+
+  // copied from MetadataSchema 2.1 (removed in 3.0)
+  private static final Range REP_WAL_RANGE =
+      new Range(RESERVED_PREFIX + "repl", true, RESERVED_PREFIX + "repm", 
false);
+
+  public Upgrader10to11() {
+    super();
+  }
+
+  @Override
+  public void upgradeZookeeper(final ServerContext context) {
+    log.info("upgrade of ZooKeeper entries");
+
+    var zrw = context.getZooReaderWriter();
+    var iid = context.getInstanceID();
+
+    // if the replication base path (../tables/+rep) assume removed or never 
existed.
+    if (!checkReplicationTableInZk(iid, zrw)) {
+      log.debug("replication table root node does not exist in ZooKeeper - 
nothing to do");
+      return;
+    }
+
+    // if the replication table is online - stop. There could be data in 
transit.
+    if (!checkReplicationOffline(iid, zrw)) {
+      throw new IllegalStateException(
+          "Replication table is not offline. Cannot continue with upgrade that 
will remove replication with replication active");
+    }
+
+    cleanMetaConfig(iid, context.getPropStore());
+
+    deleteReplicationTableZkEntries(zrw, iid);
+
+  }
+
+  @Override
+  public void upgradeRoot(final ServerContext context) {
+    log.info("upgrade root - skipping, nothing to do");
+  }
+
+  @Override
+  public void upgradeMetadata(final ServerContext context) {
+    log.info("upgrade metadata entries");
+    List<String> replTableFiles = readReplFilesFromMetadata(context);
+    deleteReplMetadataEntries(context);
+    deleteReplTableFiles(context, replTableFiles);
+  }
+
+  List<String> readReplFilesFromMetadata(final ServerContext context) {
+    List<String> results = new ArrayList<>();
+    try (Scanner scanner = context.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY)) {
+      
scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+      scanner.setRange(REP_TABLE_RANGE);
+      for (Map.Entry<Key,Value> entry : scanner) {
+        String f = entry.getKey()
+            
.getColumnQualifier(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME).toString();
+        results.add(f);
+      }
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("failed to read replication files from 
metadata", ex);
+    }
+    return results;
+  }
+
+  void deleteReplTableFiles(final ServerContext context, final List<String> 
replTableFiles) {
+    // short circuit if there are no files
+    if (replTableFiles.isEmpty()) {
+      return;
+    }
+    // write delete mutations
+    boolean haveFailures = false;
+    try (BatchWriter writer = context.createBatchWriter(MetadataTable.NAME)) {
+      for (String filename : replTableFiles) {
+        Mutation m = createDelMutation(filename);
+        log.debug("Adding delete marker for file: {}", filename);
+        writer.addMutation(m);
+      }
+    } catch (MutationsRejectedException ex) {
+      log.debug("Failed to write delete marker {}", ex.getMessage());
+      haveFailures = true;
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("failed to read replication files from 
metadata", ex);
+    }
+    if (haveFailures) {
+      throw new IllegalStateException(
+          "deletes rejected adding deletion marker for replication file 
entries, check log");
+    }
+  }
+
+  private Mutation createDelMutation(String path) {
+    Mutation delFlag = new Mutation(new 
Text(MetadataSchema.DeletesSection.encodeRow(path)));
+    delFlag.put(EMPTY_TEXT, EMPTY_TEXT, 
MetadataSchema.DeletesSection.SkewedKeyValue.NAME);
+    return delFlag;
+  }
+
+  /**
+   * remove +rep entries from metadata.
+   */
+  private void deleteReplMetadataEntries(final ServerContext context) {
+    try (BatchDeleter deleter =
+        context.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 
10)) {
+      deleter.setRanges(List.of(REP_TABLE_RANGE, REP_WAL_RANGE));
+      deleter.delete();
+    } catch (TableNotFoundException | MutationsRejectedException ex) {
+      throw new IllegalStateException("failed to remove replication info from 
metadata table", ex);
+    }
+  }
+
+  private boolean checkReplicationTableInZk(final InstanceId iid, final 
ZooReaderWriter zrw) {
+    try {
+      String path = buildRepTablePath(iid);
+      return zrw.exists(path);
+    } catch (KeeperException ex) {
+      throw new IllegalStateException("ZooKeeper error - cannot determine 
replication table status",
+          ex);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("interrupted reading replication state 
from ZooKeeper", ex);
+    }
+  }
+
+  /**
+   * To protect against removing replication information if replication is 
being used and possible
+   * active, check the replication table state in Zookeeper to see if it is 
ONLINE (active) or
+   * OFFLINE (inactive). If the state node does not exist, then the status is 
considered as OFFLINE.
+   *
+   * @return true if the replication table state is OFFLINE, false otherwise
+   */
+  private boolean checkReplicationOffline(final InstanceId iid, final 
ZooReaderWriter zrw) {
+    try {
+      String path = buildRepTablePath(iid) + ZTABLE_STATE;
+      byte[] bytes = zrw.getData(path);
+      if (bytes != null && bytes.length > 0) {
+        String status = new String(bytes, UTF_8);
+        return TableState.OFFLINE.name().equals(status);
+      }
+      return false;
+    } catch (KeeperException ex) {
+      throw new IllegalStateException("ZooKeeper error - cannot determine 
replication table status",
+          ex);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("interrupted reading replication state 
from ZooKeeper", ex);
+    }
+  }
+
+  /**
+   * Utility method to build the ZooKeeper replication table path. The path 
resolves to
+   * {@code /accumulo/INSTANCE_ID/tables/+rep}
+   */
+  static String buildRepTablePath(final InstanceId iid) {
+    return ZooUtil.getRoot(iid) + ZTABLES + "/" + REPLICATION_ID.canonical();
+  }
+
+  private void deleteReplicationTableZkEntries(ZooReaderWriter zrw, InstanceId 
iid) {
+    String repTablePath = buildRepTablePath(iid);
+    try {
+      zrw.recursiveDelete(repTablePath, ZooUtil.NodeMissingPolicy.SKIP);
+    } catch (KeeperException ex) {
+      throw new IllegalStateException(
+          "ZooKeeper error - failed recursive deletion on " + repTablePath, 
ex);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("interrupted deleting " + repTablePath + 
" from ZooKeeper",
+          ex);
+    }
+  }
+
+  private void cleanMetaConfig(final InstanceId iid, final PropStore 
propStore) {
+    PropStoreKey<TableId> metaKey = TablePropKey.of(iid, MetadataTable.ID);
+    var p = propStore.get(metaKey);
+    var props = p.asMap();
+    List<String> filtered = filterReplConfigKeys(props.keySet());
+    // add replication status formatter to remove list.
+    String v = props.get("table.formatter");
+    if (v != null && 
v.compareTo("org.apache.accumulo.server.replication.StatusFormatter") == 0) {
+      filtered.add("table.formatter");
+    }
+
+    if (filtered.size() > 0) {
+      log.trace("Upgrade filtering replication iterators for id: {}", metaKey);
+      propStore.removeProperties(metaKey, filtered);
+    }
+  }
+
+  /**
+   * Return a list of property keys that match replication iterator settings. 
This is specifically a
+   * narrow filter to avoid potential matches with user define or properties 
that contain
+   * replication in the property name (specifically table.file.replication 
which set hdfs block
+   * replication.)
+   */
+  private List<String> filterReplConfigKeys(Set<String> keys) {
+    String REPL_ITERATOR_PATTERN = 
"^table\\.iterator\\.(majc|minc|scan)\\.replcombiner$";
+    String REPL_COLUMN_PATTERN =
+        "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner\\.opt\\.columns$";
+
+    Pattern p = Pattern.compile("(" + REPL_ITERATOR_PATTERN + "|" + 
REPL_COLUMN_PATTERN + ")");
+
+    return keys.stream().filter(e -> 
p.matcher(e).find()).collect(Collectors.toList());
+  }
+}
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader10to11Test.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader10to11Test.java
new file mode 100644
index 0000000000..335488c424
--- /dev/null
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader10to11Test.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.upgrade;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.core.Constants.ZTABLE_STATE;
+import static 
org.apache.accumulo.manager.upgrade.Upgrader10to11.buildRepTablePath;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.TablePropKey;
+import org.apache.zookeeper.KeeperException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class Upgrader10to11Test {
+  private static final Logger log = 
LoggerFactory.getLogger(Upgrader10to11Test.class);
+
+  private InstanceId instanceId = null;
+  private ServerContext context = null;
+  private ZooReaderWriter zrw = null;
+
+  private PropStore propStore = null;
+
+  @BeforeEach
+  public void initMocks() {
+    instanceId = InstanceId.of(UUID.randomUUID());
+    context = createMock(ServerContext.class);
+    zrw = createMock(ZooReaderWriter.class);
+    propStore = createMock(PropStore.class);
+
+    expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
+    expect(context.getInstanceID()).andReturn(instanceId).anyTimes();
+  }
+
+  @Test
+  void upgradeZooKeeperGoPath() throws Exception {
+
+    expect(context.getPropStore()).andReturn(propStore).anyTimes();
+    expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once();
+    expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE))
+        .andReturn(TableState.OFFLINE.name().getBytes(UTF_8)).once();
+    zrw.recursiveDelete(buildRepTablePath(instanceId), 
ZooUtil.NodeMissingPolicy.SKIP);
+    expectLastCall().once();
+
+    expect(propStore.get(TablePropKey.of(instanceId, MetadataTable.ID)))
+        .andReturn(new VersionedProperties()).once();
+
+    replay(context, zrw, propStore);
+
+    Upgrader10to11 upgrader = new Upgrader10to11();
+    upgrader.upgradeZookeeper(context);
+
+    verify(context, zrw);
+  }
+
+  @Test
+  void upgradeZookeeperNoReplTableNode() throws Exception {
+
+    expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(false).once();
+    replay(context, zrw);
+
+    Upgrader10to11 upgrader = new Upgrader10to11();
+    upgrader.upgradeZookeeper(context);
+
+    verify(context, zrw);
+  }
+
+  @Test
+  void checkReplicationStateOffline() throws Exception {
+
+    expect(context.getPropStore()).andReturn(propStore).anyTimes();
+    expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once();
+    expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE))
+        .andReturn(TableState.OFFLINE.name().getBytes(UTF_8)).once();
+    zrw.recursiveDelete(buildRepTablePath(instanceId), 
ZooUtil.NodeMissingPolicy.SKIP);
+    expectLastCall().once();
+    expect(propStore.get(TablePropKey.of(instanceId, MetadataTable.ID)))
+        .andReturn(new VersionedProperties()).once();
+
+    replay(context, zrw, propStore);
+
+    Upgrader10to11 upgrader = new Upgrader10to11();
+
+    upgrader.upgradeZookeeper(context);
+
+    verify(context, zrw);
+  }
+
+  @Test
+  void checkReplicationStateOnline() throws Exception {
+    expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once();
+    expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE))
+        .andReturn(TableState.ONLINE.name().getBytes(UTF_8)).anyTimes();
+    replay(context, zrw);
+
+    Upgrader10to11 upgrader = new Upgrader10to11();
+    assertThrows(IllegalStateException.class, () -> 
upgrader.upgradeZookeeper(context));
+
+    verify(context, zrw);
+  }
+
+  @Test
+  void checkReplicationStateNoNode() throws Exception {
+    expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once();
+    expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE))
+        .andThrow(new KeeperException.NoNodeException("force no node 
exception")).anyTimes();
+    replay(context, zrw);
+
+    Upgrader10to11 upgrader = new Upgrader10to11();
+    assertThrows(IllegalStateException.class, () -> 
upgrader.upgradeZookeeper(context));
+
+    verify(context, zrw);
+  }
+
+  @Test
+  public void filterTest() {
+    Map<String,String> entries = new HashMap<>();
+    entries.put("table.file.compress.blocksize", "32K");
+    entries.put("table.file.replication", "5");
+    entries.put("table.group.server", "file,log,srv,future");
+    entries.put("table.iterator.majc.bulkLoadFilter",
+        "20,org.apache.accumulo.server.iterators.MetadataBulkLoadFilter");
+    entries.put("table.iterator.majc.replcombiner",
+        "9,org.apache.accumulo.server.replication.StatusCombiner");
+    entries.put("table.iterator.majc.replcombiner.opt.columns", "stat");
+    entries.put("table.iterator.majc.vers",
+        "10,org.apache.accumulo.core.iterators.user.VersioningIterator");
+    entries.put("table.iterator.majc.vers.opt.maxVersions", "1");
+    entries.put("table.iterator.minc.replcombiner",
+        "9,org.apache.accumulo.server.replication.StatusCombiner");
+    entries.put("table.iterator.minc.replcombiner.opt.columns", "stat");
+    entries.put("table.iterator.minc.vers",
+        "10,org.apache.accumulo.core.iterators.user.VersioningIterator");
+    entries.put("table.iterator.minc.vers.opt.maxVersions", "1");
+    entries.put("table.iterator.scan.replcombiner",
+        "9,org.apache.accumulo.server.replication.StatusCombiner");
+    entries.put("table.iterator.scan.replcombiner.opt.columns", "stat");
+    entries.put("table.iterator.scan.vers",
+        "10,org.apache.accumulo.core.iterators.user.VersioningIterator");
+
+    String REPL_ITERATOR_PATTERN = 
"^table\\.iterator\\.(majc|minc|scan)\\.replcombiner$";
+    String REPL_COLUMN_PATTERN =
+        "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner\\.opt\\.columns$";
+
+    Pattern p = Pattern.compile("(" + REPL_ITERATOR_PATTERN + "|" + 
REPL_COLUMN_PATTERN + ")");
+
+    List<String> filtered =
+        entries.keySet().stream().filter(e -> 
p.matcher(e).find()).collect(Collectors.toList());
+
+    assertEquals(6, filtered.size());
+    log.info("F:{}", filtered);
+  }
+}

Reply via email to