This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new a2ce072e5c Make sure tablets with WALs are not merged (#3900) a2ce072e5c is described below commit a2ce072e5c75dc8347b6aa07375efdac5480b015 Author: Christopher L. Shannon <christopher.l.shan...@gmail.com> AuthorDate: Fri Oct 27 18:42:26 2023 -0400 Make sure tablets with WALs are not merged (#3900) Add extra verification to make sure that no tablets that are merged contain any WALs to prevent losing data Co-authored-by: Keith Turner <ktur...@apache.org> --- .../java/org/apache/accumulo/manager/Manager.java | 6 ++ .../accumulo/manager/TabletGroupWatcher.java | 28 ++++++++- .../apache/accumulo/manager/state/MergeStats.java | 22 ++++++- .../accumulo/manager/state/MergeStatsTest.java | 71 ++++++++++++++++++++++ .../apache/accumulo/test/manager/MergeStateIT.java | 19 ++++++ 5 files changed, 142 insertions(+), 4 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b86cba576c..196b50d76c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -658,6 +658,12 @@ public class Manager extends AbstractServer return TabletGoalState.HOSTED; case WAITING_FOR_OFFLINE: + // If we have walogs we need to be HOSTED to recover + if (!tls.walogs.isEmpty()) { + return TabletGoalState.HOSTED; + } else { + return TabletGoalState.UNASSIGNED; + } case MERGING: return TabletGoalState.UNASSIGNED; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index b9914bef1f..4ffeee967d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -72,6 +72,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; @@ -698,6 +699,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { ServerColumnFamily.TIME_COLUMN.fetch(scanner); scanner.fetchColumnFamily(DataFileColumnFamily.NAME); scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME); + scanner.fetchColumnFamily(LogColumnFamily.NAME); Set<ReferenceFile> datafilesAndDirs = new TreeSet<>(); for (Entry<Key,Value> entry : scanner) { Key key = entry.getKey(); @@ -710,9 +713,13 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { metadataTime = MetadataTime.parse(entry.getValue().toString()); - } else if (key.compareColumnFamily(CurrentLocationColumnFamily.NAME) == 0) { + } else if (isTabletAssigned(key)) { throw new IllegalStateException( "Tablet " + key.getRow() + " is assigned during a merge!"); + // Verify that Tablet has no WALs + } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { + throw new IllegalStateException( + "Tablet " + key.getRow() + " has walogs during a delete!"); } else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { var allVolumesDirectory = new AllVolumesDirectory(extent.tableId(), entry.getValue().toString()); @@ -783,13 +790,25 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); ServerColumnFamily.TIME_COLUMN.fetch(scanner); ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME); + scanner.fetchColumnFamily(LogColumnFamily.NAME); Mutation m = new Mutation(stopRow); MetadataTime maxLogicalTime = null; for (Entry<Key,Value> entry : scanner) { Key key = entry.getKey(); Value value = entry.getValue(); - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + + // Verify that Tablet is offline + if (isTabletAssigned(key)) { + throw new IllegalStateException( + "Tablet " + key.getRow() + " is assigned during a merge!"); + // Verify that Tablet has no WALs + } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { + throw new IllegalStateException("Tablet " + key.getRow() + " has walogs during a merge!"); + } else if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { m.put(key.getColumnFamily(), key.getColumnQualifier(), value); fileCount++; } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) @@ -894,6 +913,11 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { bw.flush(); } + private boolean isTabletAssigned(Key key) { + return key.getColumnFamily().equals(CurrentLocationColumnFamily.NAME) + || key.getColumnFamily().equals(FutureLocationColumnFamily.NAME); + } + private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { try { AccumuloClient client = manager.getContext(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java index df18c869da..5cecbfe98b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java @@ -52,6 +52,9 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -201,6 +204,9 @@ public class MergeStats { private boolean verifyMergeConsistency(AccumuloClient accumuloClient, CurrentState manager) throws TableNotFoundException, IOException { + // The only expected state when this method is called is WAITING_FOR_OFFLINE + verifyState(info, MergeState.WAITING_FOR_OFFLINE); + MergeStats verify = new MergeStats(info); KeyExtent extent = info.getExtent(); Scanner scanner = accumuloClient @@ -230,8 +236,9 @@ public class MergeStats { break; } - if (!tls.walogs.isEmpty() && verify.getMergeInfo().needsToBeChopped(tls.extent)) { - log.debug("failing consistency: needs to be chopped {}", tls.extent); + // Verify that no WALs exist + if (!verifyWalogs(tls)) { + log.debug("failing consistency: {} has walogs {}", tls.extent, tls.walogs.size()); return false; } @@ -271,6 +278,17 @@ public class MergeStats { && unassigned == verify.total; } + @VisibleForTesting + void verifyState(MergeInfo info, MergeState expectedState) { + Preconditions.checkState(info.getState() == expectedState, "Unexpected merge state %s", + info.getState()); + } + + @VisibleForTesting + boolean verifyWalogs(TabletLocationState tls) { + return tls.walogs.isEmpty(); + } + public static void main(String[] args) throws Exception { ServerUtilOpts opts = new ServerUtilOpts(); opts.parseArgs(MergeStats.class.getName(), args); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java new file mode 100644 index 0000000000..c9ad9763c0 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.state; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.TabletLocationState; +import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; +import org.apache.accumulo.server.manager.state.MergeInfo; +import org.apache.accumulo.server.manager.state.MergeInfo.Operation; +import org.apache.accumulo.server.manager.state.MergeState; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class MergeStatsTest { + + @Test + public void testVerifyState() { + KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), new Text("begin")); + MergeInfo mergeInfo = new MergeInfo(keyExtent, Operation.MERGE); + MergeStats stats = new MergeStats(mergeInfo); + mergeInfo.setState(MergeState.WAITING_FOR_OFFLINE); + + // Verify WAITING_FOR_OFFLINE does not throw an exception + stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE); + + // State is wrong so should throw exception + mergeInfo.setState(MergeState.WAITING_FOR_CHOPPED); + assertThrows(IllegalStateException.class, + () -> stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE)); + } + + @Test + public void testVerifyWalogs() throws BadLocationStateException { + KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), new Text("begin")); + MergeStats stats = new MergeStats(new MergeInfo(keyExtent, Operation.MERGE)); + + // Verify that if there are Walogs the return true, else false + assertTrue(stats.verifyWalogs(getState(keyExtent, List.of()))); + assertFalse(stats.verifyWalogs(getState(keyExtent, List.of(List.of("log1"))))); + } + + private TabletLocationState getState(KeyExtent keyExtent, Collection<Collection<String>> walogs) + throws BadLocationStateException { + return new TabletLocationState(keyExtent, null, null, null, null, walogs, true); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java index e36c8b8dd8..21173c3c5e 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ta import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.manager.state.MergeStats; import org.apache.accumulo.server.ServerContext; @@ -207,6 +208,24 @@ public class MergeStateIT extends ConfigurableMacBase { metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, Location.current(state.someTServer), null, null, walogs, false)), null); + // Add a walog which should keep the state from transitioning to MERGING + KeyExtent ke = new KeyExtent(tableId, new Text("t"), new Text("p")); + m = new Mutation(ke.toMetaRow()); + LogEntry logEntry = new LogEntry(ke, 100, "f1"); + m.at().family(logEntry.getColumnFamily()).qualifier(logEntry.getColumnQualifier()) + .timestamp(logEntry.timestamp).put(logEntry.getValue()); + update(accumuloClient, m); + + // Verify state is still WAITING_FOR_OFFLINE + stats = scan(state, metaDataStateStore); + newState = stats.nextMergeState(accumuloClient, state); + assertEquals(MergeState.WAITING_FOR_OFFLINE, newState); + + // Delete the walog which will now allow a transition to MERGING + m = new Mutation(ke.toMetaRow()); + m.putDelete(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.timestamp); + update(accumuloClient, m); + // now we can split stats = scan(state, metaDataStateStore); assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient, state));