This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new fc2a2e7dd3 Partially revert 1414641 to resurrect 2.1 upgrade code (#3999) fc2a2e7dd3 is described below commit fc2a2e7dd32ac52da3e861c2f05b126b3b07d7b0 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Nov 30 15:24:11 2023 -0500 Partially revert 1414641 to resurrect 2.1 upgrade code (#3999) --- .../accumulo/server/AccumuloDataVersion.java | 12 +- .../apache/accumulo/server/ServerContextTest.java | 2 +- .../manager/upgrade/UpgradeCoordinator.java | 6 +- .../accumulo/manager/upgrade/Upgrader10to11.java | 268 +++++++++++++++++++++ .../manager/upgrade/Upgrader10to11Test.java | 192 +++++++++++++++ 5 files changed, 474 insertions(+), 6 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java index a96721988e..9f459ea599 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java @@ -37,7 +37,7 @@ import java.util.Set; public class AccumuloDataVersion { /** - * version (12) reflects On-Demand tablets starting with 4.0 + * version (13) reflects On-Demand tablets starting with 4.0 */ public static final int ONDEMAND_TABLETS_FOR_VERSION_4 = 13; @@ -52,11 +52,16 @@ public class AccumuloDataVersion { */ public static final int REMOVE_DEPRECATIONS_FOR_VERSION_3 = 11; + /** + * version (10) reflects changes to how root tablet metadata is serialized in zookeeper starting + * with 2.1. See {@link org.apache.accumulo.core.metadata.schema.RootTabletMetadata}. + */ + public static final int ROOT_TABLET_META_CHANGES = 10; + /** * Historic data versions * * <ul> - * <li>version (10) Changes to how root tablet metadata is serialized in zookeeper in 2.1.0</li> * <li>version (9) RFiles and wal crypto serialization changes. RFile summary data in 2.0.0</li> * <li>version (8) RFile index (ACCUMULO-1124) and wal tracking in ZK in 1.8.0</li> * <li>version (7) also reflects the addition of a replication table in 1.7.0 @@ -78,6 +83,7 @@ public class AccumuloDataVersion { } // ELASTICITY_TODO get upgrade working + // public static final Set<Integer> CAN_RUN = Set.of(ROOT_TABLET_META_CHANGES, CURRENT_VERSION); public static final Set<Integer> CAN_RUN = Set.of(CURRENT_VERSION); /** @@ -99,6 +105,8 @@ public class AccumuloDataVersion { private static String dataVersionToReleaseName(final int version) { switch (version) { + case ROOT_TABLET_META_CHANGES: + return "2.1.0"; case REMOVE_DEPRECATIONS_FOR_VERSION_3: return "3.0.0"; case METADATA_FILE_JSON_ENCODING: diff --git a/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java b/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java index 7694b34166..f86127ea9b 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/ServerContextTest.java @@ -138,7 +138,7 @@ public class ServerContextTest { final int oldestSupported = AccumuloDataVersion.ONDEMAND_TABLETS_FOR_VERSION_4; // ELASTICITY_TODO basically disable check until upgrade to 3.1 is supported. Should be: - // final int oldestSupported = AccumuloDataVersion.METADATA_FILE_JSON_ENCODING; + // final int oldestSupported = AccumuloDataVersion.ROOT_TABLET_META_CHANGES; final int currentVersion = AccumuloDataVersion.get(); IntConsumer shouldPass = ServerContext::ensureDataVersionCompatible; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index dedda83872..94a81eaaf9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.manager.upgrade; -import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3; +import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES; import java.io.IOException; import java.util.Collections; @@ -112,8 +112,8 @@ public class UpgradeCoordinator { private int currentVersion; // map of "current version" -> upgrader to next version. // Sorted so upgrades execute in order from the oldest supported data version to current - private final Map<Integer,Upgrader> upgraders = Collections.unmodifiableMap( - new TreeMap<>(Map.of(REMOVE_DEPRECATIONS_FOR_VERSION_3, new Upgrader11to12()))); + private final Map<Integer,Upgrader> upgraders = Collections + .unmodifiableMap(new TreeMap<>(Map.of(ROOT_TABLET_META_CHANGES, new Upgrader10to11()))); private volatile UpgradeStatus status; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader10to11.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader10to11.java new file mode 100644 index 0000000000..33d35e1d41 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader10to11.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.upgrade; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.Constants.ZTABLES; +import static org.apache.accumulo.core.Constants.ZTABLE_STATE; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX; +import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreKey; +import org.apache.accumulo.server.conf.store.TablePropKey; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Upgrader10to11 implements Upgrader { + + private static final Logger log = LoggerFactory.getLogger(Upgrader10to11.class); + + // Included for upgrade code usage any other usage post 3.0 should not be used. + private static final TableId REPLICATION_ID = TableId.of("+rep"); + + private static final Range REP_TABLE_RANGE = + new Range(REPLICATION_ID.canonical() + ";", true, REPLICATION_ID.canonical() + "<", true); + + // copied from MetadataSchema 2.1 (removed in 3.0) + private static final Range REP_WAL_RANGE = + new Range(RESERVED_PREFIX + "repl", true, RESERVED_PREFIX + "repm", false); + + public Upgrader10to11() { + super(); + } + + @Override + public void upgradeZookeeper(final ServerContext context) { + log.info("upgrade of ZooKeeper entries"); + + var zrw = context.getZooReaderWriter(); + var iid = context.getInstanceID(); + + // if the replication base path (../tables/+rep) assume removed or never existed. + if (!checkReplicationTableInZk(iid, zrw)) { + log.debug("replication table root node does not exist in ZooKeeper - nothing to do"); + return; + } + + // if the replication table is online - stop. There could be data in transit. + if (!checkReplicationOffline(iid, zrw)) { + throw new IllegalStateException( + "Replication table is not offline. Cannot continue with upgrade that will remove replication with replication active"); + } + + cleanMetaConfig(iid, context.getPropStore()); + + deleteReplicationTableZkEntries(zrw, iid); + + } + + @Override + public void upgradeRoot(final ServerContext context) { + log.info("upgrade root - skipping, nothing to do"); + } + + @Override + public void upgradeMetadata(final ServerContext context) { + log.info("upgrade metadata entries"); + List<String> replTableFiles = readReplFilesFromMetadata(context); + deleteReplMetadataEntries(context); + deleteReplTableFiles(context, replTableFiles); + } + + List<String> readReplFilesFromMetadata(final ServerContext context) { + List<String> results = new ArrayList<>(); + try (Scanner scanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + scanner.setRange(REP_TABLE_RANGE); + for (Map.Entry<Key,Value> entry : scanner) { + String f = entry.getKey() + .getColumnQualifier(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME).toString(); + results.add(f); + } + } catch (TableNotFoundException ex) { + throw new IllegalStateException("failed to read replication files from metadata", ex); + } + return results; + } + + void deleteReplTableFiles(final ServerContext context, final List<String> replTableFiles) { + // short circuit if there are no files + if (replTableFiles.isEmpty()) { + return; + } + // write delete mutations + boolean haveFailures = false; + try (BatchWriter writer = context.createBatchWriter(MetadataTable.NAME)) { + for (String filename : replTableFiles) { + Mutation m = createDelMutation(filename); + log.debug("Adding delete marker for file: {}", filename); + writer.addMutation(m); + } + } catch (MutationsRejectedException ex) { + log.debug("Failed to write delete marker {}", ex.getMessage()); + haveFailures = true; + } catch (TableNotFoundException ex) { + throw new IllegalStateException("failed to read replication files from metadata", ex); + } + if (haveFailures) { + throw new IllegalStateException( + "deletes rejected adding deletion marker for replication file entries, check log"); + } + } + + private Mutation createDelMutation(String path) { + Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.encodeRow(path))); + delFlag.put(EMPTY_TEXT, EMPTY_TEXT, MetadataSchema.DeletesSection.SkewedKeyValue.NAME); + return delFlag; + } + + /** + * remove +rep entries from metadata. + */ + private void deleteReplMetadataEntries(final ServerContext context) { + try (BatchDeleter deleter = + context.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 10)) { + deleter.setRanges(List.of(REP_TABLE_RANGE, REP_WAL_RANGE)); + deleter.delete(); + } catch (TableNotFoundException | MutationsRejectedException ex) { + throw new IllegalStateException("failed to remove replication info from metadata table", ex); + } + } + + private boolean checkReplicationTableInZk(final InstanceId iid, final ZooReaderWriter zrw) { + try { + String path = buildRepTablePath(iid); + return zrw.exists(path); + } catch (KeeperException ex) { + throw new IllegalStateException("ZooKeeper error - cannot determine replication table status", + ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("interrupted reading replication state from ZooKeeper", ex); + } + } + + /** + * To protect against removing replication information if replication is being used and possible + * active, check the replication table state in Zookeeper to see if it is ONLINE (active) or + * OFFLINE (inactive). If the state node does not exist, then the status is considered as OFFLINE. + * + * @return true if the replication table state is OFFLINE, false otherwise + */ + private boolean checkReplicationOffline(final InstanceId iid, final ZooReaderWriter zrw) { + try { + String path = buildRepTablePath(iid) + ZTABLE_STATE; + byte[] bytes = zrw.getData(path); + if (bytes != null && bytes.length > 0) { + String status = new String(bytes, UTF_8); + return TableState.OFFLINE.name().equals(status); + } + return false; + } catch (KeeperException ex) { + throw new IllegalStateException("ZooKeeper error - cannot determine replication table status", + ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("interrupted reading replication state from ZooKeeper", ex); + } + } + + /** + * Utility method to build the ZooKeeper replication table path. The path resolves to + * {@code /accumulo/INSTANCE_ID/tables/+rep} + */ + static String buildRepTablePath(final InstanceId iid) { + return ZooUtil.getRoot(iid) + ZTABLES + "/" + REPLICATION_ID.canonical(); + } + + private void deleteReplicationTableZkEntries(ZooReaderWriter zrw, InstanceId iid) { + String repTablePath = buildRepTablePath(iid); + try { + zrw.recursiveDelete(repTablePath, ZooUtil.NodeMissingPolicy.SKIP); + } catch (KeeperException ex) { + throw new IllegalStateException( + "ZooKeeper error - failed recursive deletion on " + repTablePath, ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("interrupted deleting " + repTablePath + " from ZooKeeper", + ex); + } + } + + private void cleanMetaConfig(final InstanceId iid, final PropStore propStore) { + PropStoreKey<TableId> metaKey = TablePropKey.of(iid, MetadataTable.ID); + var p = propStore.get(metaKey); + var props = p.asMap(); + List<String> filtered = filterReplConfigKeys(props.keySet()); + // add replication status formatter to remove list. + String v = props.get("table.formatter"); + if (v != null && v.compareTo("org.apache.accumulo.server.replication.StatusFormatter") == 0) { + filtered.add("table.formatter"); + } + + if (filtered.size() > 0) { + log.trace("Upgrade filtering replication iterators for id: {}", metaKey); + propStore.removeProperties(metaKey, filtered); + } + } + + /** + * Return a list of property keys that match replication iterator settings. This is specifically a + * narrow filter to avoid potential matches with user define or properties that contain + * replication in the property name (specifically table.file.replication which set hdfs block + * replication.) + */ + private List<String> filterReplConfigKeys(Set<String> keys) { + String REPL_ITERATOR_PATTERN = "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner$"; + String REPL_COLUMN_PATTERN = + "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner\\.opt\\.columns$"; + + Pattern p = Pattern.compile("(" + REPL_ITERATOR_PATTERN + "|" + REPL_COLUMN_PATTERN + ")"); + + return keys.stream().filter(e -> p.matcher(e).find()).collect(Collectors.toList()); + } +} diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader10to11Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader10to11Test.java new file mode 100644 index 0000000000..335488c424 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader10to11Test.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.upgrade; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.Constants.ZTABLE_STATE; +import static org.apache.accumulo.manager.upgrade.Upgrader10to11.buildRepTablePath; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.TablePropKey; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class Upgrader10to11Test { + private static final Logger log = LoggerFactory.getLogger(Upgrader10to11Test.class); + + private InstanceId instanceId = null; + private ServerContext context = null; + private ZooReaderWriter zrw = null; + + private PropStore propStore = null; + + @BeforeEach + public void initMocks() { + instanceId = InstanceId.of(UUID.randomUUID()); + context = createMock(ServerContext.class); + zrw = createMock(ZooReaderWriter.class); + propStore = createMock(PropStore.class); + + expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); + expect(context.getInstanceID()).andReturn(instanceId).anyTimes(); + } + + @Test + void upgradeZooKeeperGoPath() throws Exception { + + expect(context.getPropStore()).andReturn(propStore).anyTimes(); + expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once(); + expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE)) + .andReturn(TableState.OFFLINE.name().getBytes(UTF_8)).once(); + zrw.recursiveDelete(buildRepTablePath(instanceId), ZooUtil.NodeMissingPolicy.SKIP); + expectLastCall().once(); + + expect(propStore.get(TablePropKey.of(instanceId, MetadataTable.ID))) + .andReturn(new VersionedProperties()).once(); + + replay(context, zrw, propStore); + + Upgrader10to11 upgrader = new Upgrader10to11(); + upgrader.upgradeZookeeper(context); + + verify(context, zrw); + } + + @Test + void upgradeZookeeperNoReplTableNode() throws Exception { + + expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(false).once(); + replay(context, zrw); + + Upgrader10to11 upgrader = new Upgrader10to11(); + upgrader.upgradeZookeeper(context); + + verify(context, zrw); + } + + @Test + void checkReplicationStateOffline() throws Exception { + + expect(context.getPropStore()).andReturn(propStore).anyTimes(); + expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once(); + expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE)) + .andReturn(TableState.OFFLINE.name().getBytes(UTF_8)).once(); + zrw.recursiveDelete(buildRepTablePath(instanceId), ZooUtil.NodeMissingPolicy.SKIP); + expectLastCall().once(); + expect(propStore.get(TablePropKey.of(instanceId, MetadataTable.ID))) + .andReturn(new VersionedProperties()).once(); + + replay(context, zrw, propStore); + + Upgrader10to11 upgrader = new Upgrader10to11(); + + upgrader.upgradeZookeeper(context); + + verify(context, zrw); + } + + @Test + void checkReplicationStateOnline() throws Exception { + expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once(); + expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE)) + .andReturn(TableState.ONLINE.name().getBytes(UTF_8)).anyTimes(); + replay(context, zrw); + + Upgrader10to11 upgrader = new Upgrader10to11(); + assertThrows(IllegalStateException.class, () -> upgrader.upgradeZookeeper(context)); + + verify(context, zrw); + } + + @Test + void checkReplicationStateNoNode() throws Exception { + expect(zrw.exists(buildRepTablePath(instanceId))).andReturn(true).once(); + expect(zrw.getData(buildRepTablePath(instanceId) + ZTABLE_STATE)) + .andThrow(new KeeperException.NoNodeException("force no node exception")).anyTimes(); + replay(context, zrw); + + Upgrader10to11 upgrader = new Upgrader10to11(); + assertThrows(IllegalStateException.class, () -> upgrader.upgradeZookeeper(context)); + + verify(context, zrw); + } + + @Test + public void filterTest() { + Map<String,String> entries = new HashMap<>(); + entries.put("table.file.compress.blocksize", "32K"); + entries.put("table.file.replication", "5"); + entries.put("table.group.server", "file,log,srv,future"); + entries.put("table.iterator.majc.bulkLoadFilter", + "20,org.apache.accumulo.server.iterators.MetadataBulkLoadFilter"); + entries.put("table.iterator.majc.replcombiner", + "9,org.apache.accumulo.server.replication.StatusCombiner"); + entries.put("table.iterator.majc.replcombiner.opt.columns", "stat"); + entries.put("table.iterator.majc.vers", + "10,org.apache.accumulo.core.iterators.user.VersioningIterator"); + entries.put("table.iterator.majc.vers.opt.maxVersions", "1"); + entries.put("table.iterator.minc.replcombiner", + "9,org.apache.accumulo.server.replication.StatusCombiner"); + entries.put("table.iterator.minc.replcombiner.opt.columns", "stat"); + entries.put("table.iterator.minc.vers", + "10,org.apache.accumulo.core.iterators.user.VersioningIterator"); + entries.put("table.iterator.minc.vers.opt.maxVersions", "1"); + entries.put("table.iterator.scan.replcombiner", + "9,org.apache.accumulo.server.replication.StatusCombiner"); + entries.put("table.iterator.scan.replcombiner.opt.columns", "stat"); + entries.put("table.iterator.scan.vers", + "10,org.apache.accumulo.core.iterators.user.VersioningIterator"); + + String REPL_ITERATOR_PATTERN = "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner$"; + String REPL_COLUMN_PATTERN = + "^table\\.iterator\\.(majc|minc|scan)\\.replcombiner\\.opt\\.columns$"; + + Pattern p = Pattern.compile("(" + REPL_ITERATOR_PATTERN + "|" + REPL_COLUMN_PATTERN + ")"); + + List<String> filtered = + entries.keySet().stream().filter(e -> p.matcher(e).find()).collect(Collectors.toList()); + + assertEquals(6, filtered.size()); + log.info("F:{}", filtered); + } +}