This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new a2ce072e5c Make sure tablets with WALs are not merged (#3900)
a2ce072e5c is described below
commit a2ce072e5c75dc8347b6aa07375efdac5480b015
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri Oct 27 18:42:26 2023 -0400
Make sure tablets with WALs are not merged (#3900)
Add extra verification to make sure that no tablets that are merged
contain any WALs to prevent losing data
Co-authored-by: Keith Turner <[email protected]>
---
.../java/org/apache/accumulo/manager/Manager.java | 6 ++
.../accumulo/manager/TabletGroupWatcher.java | 28 ++++++++-
.../apache/accumulo/manager/state/MergeStats.java | 22 ++++++-
.../accumulo/manager/state/MergeStatsTest.java | 71 ++++++++++++++++++++++
.../apache/accumulo/test/manager/MergeStateIT.java | 19 ++++++
5 files changed, 142 insertions(+), 4 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index b86cba576c..196b50d76c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -658,6 +658,12 @@ public class Manager extends AbstractServer
return TabletGoalState.HOSTED;
case WAITING_FOR_OFFLINE:
+ // If we have walogs we need to be HOSTED to recover
+ if (!tls.walogs.isEmpty()) {
+ return TabletGoalState.HOSTED;
+ } else {
+ return TabletGoalState.UNASSIGNED;
+ }
case MERGING:
return TabletGoalState.UNASSIGNED;
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index b9914bef1f..4ffeee967d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -72,6 +72,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -698,6 +699,8 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
ServerColumnFamily.TIME_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
Set<ReferenceFile> datafilesAndDirs = new TreeSet<>();
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
@@ -710,9 +713,13 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
}
} else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
metadataTime = MetadataTime.parse(entry.getValue().toString());
- } else if (key.compareColumnFamily(CurrentLocationColumnFamily.NAME)
== 0) {
+ } else if (isTabletAssigned(key)) {
throw new IllegalStateException(
"Tablet " + key.getRow() + " is assigned during a merge!");
+ // Verify that Tablet has no WALs
+ } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+ throw new IllegalStateException(
+ "Tablet " + key.getRow() + " has walogs during a delete!");
} else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
var allVolumesDirectory =
new AllVolumesDirectory(extent.tableId(),
entry.getValue().toString());
@@ -783,13 +790,25 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
ServerColumnFamily.TIME_COLUMN.fetch(scanner);
ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+
scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
Mutation m = new Mutation(stopRow);
MetadataTime maxLogicalTime = null;
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
Value value = entry.getValue();
- if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+
+ // Verify that Tablet is offline
+ if (isTabletAssigned(key)) {
+ throw new IllegalStateException(
+ "Tablet " + key.getRow() + " is assigned during a merge!");
+ // Verify that Tablet has no WALs
+ } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+ throw new IllegalStateException("Tablet " + key.getRow() + " has
walogs during a merge!");
+ } else if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
fileCount++;
} else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)
@@ -894,6 +913,11 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
bw.flush();
}
+ private boolean isTabletAssigned(Key key) {
+ return key.getColumnFamily().equals(CurrentLocationColumnFamily.NAME)
+ || key.getColumnFamily().equals(FutureLocationColumnFamily.NAME);
+ }
+
private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
try {
AccumuloClient client = manager.getContext();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
index df18c869da..5cecbfe98b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
@@ -52,6 +52,9 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
@@ -201,6 +204,9 @@ public class MergeStats {
private boolean verifyMergeConsistency(AccumuloClient accumuloClient,
CurrentState manager)
throws TableNotFoundException, IOException {
+ // The only expected state when this method is called is
WAITING_FOR_OFFLINE
+ verifyState(info, MergeState.WAITING_FOR_OFFLINE);
+
MergeStats verify = new MergeStats(info);
KeyExtent extent = info.getExtent();
Scanner scanner = accumuloClient
@@ -230,8 +236,9 @@ public class MergeStats {
break;
}
- if (!tls.walogs.isEmpty() &&
verify.getMergeInfo().needsToBeChopped(tls.extent)) {
- log.debug("failing consistency: needs to be chopped {}", tls.extent);
+ // Verify that no WALs exist
+ if (!verifyWalogs(tls)) {
+ log.debug("failing consistency: {} has walogs {}", tls.extent,
tls.walogs.size());
return false;
}
@@ -271,6 +278,17 @@ public class MergeStats {
&& unassigned == verify.total;
}
+ @VisibleForTesting
+ void verifyState(MergeInfo info, MergeState expectedState) {
+ Preconditions.checkState(info.getState() == expectedState, "Unexpected
merge state %s",
+ info.getState());
+ }
+
+ @VisibleForTesting
+ boolean verifyWalogs(TabletLocationState tls) {
+ return tls.walogs.isEmpty();
+ }
+
public static void main(String[] args) throws Exception {
ServerUtilOpts opts = new ServerUtilOpts();
opts.parseArgs(MergeStats.class.getName(), args);
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
new file mode 100644
index 0000000000..c9ad9763c0
--- /dev/null
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.state;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.TabletLocationState;
+import
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.manager.state.MergeInfo;
+import org.apache.accumulo.server.manager.state.MergeInfo.Operation;
+import org.apache.accumulo.server.manager.state.MergeState;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+public class MergeStatsTest {
+
+ @Test
+ public void testVerifyState() {
+ KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"),
new Text("begin"));
+ MergeInfo mergeInfo = new MergeInfo(keyExtent, Operation.MERGE);
+ MergeStats stats = new MergeStats(mergeInfo);
+ mergeInfo.setState(MergeState.WAITING_FOR_OFFLINE);
+
+ // Verify WAITING_FOR_OFFLINE does not throw an exception
+ stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE);
+
+ // State is wrong so should throw exception
+ mergeInfo.setState(MergeState.WAITING_FOR_CHOPPED);
+ assertThrows(IllegalStateException.class,
+ () -> stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE));
+ }
+
+ @Test
+ public void testVerifyWalogs() throws BadLocationStateException {
+ KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"),
new Text("begin"));
+ MergeStats stats = new MergeStats(new MergeInfo(keyExtent,
Operation.MERGE));
+
+ // Verify that if there are Walogs the return true, else false
+ assertTrue(stats.verifyWalogs(getState(keyExtent, List.of())));
+ assertFalse(stats.verifyWalogs(getState(keyExtent,
List.of(List.of("log1")))));
+ }
+
+ private TabletLocationState getState(KeyExtent keyExtent,
Collection<Collection<String>> walogs)
+ throws BadLocationStateException {
+ return new TabletLocationState(keyExtent, null, null, null, null, walogs,
true);
+ }
+
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
index e36c8b8dd8..21173c3c5e 100644
--- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
@@ -46,6 +46,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ta
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.manager.state.MergeStats;
import org.apache.accumulo.server.ServerContext;
@@ -207,6 +208,24 @@ public class MergeStateIT extends ConfigurableMacBase {
metaDataStateStore.unassign(Collections.singletonList(new
TabletLocationState(tablet, null,
Location.current(state.someTServer), null, null, walogs, false)),
null);
+ // Add a walog which should keep the state from transitioning to MERGING
+ KeyExtent ke = new KeyExtent(tableId, new Text("t"), new Text("p"));
+ m = new Mutation(ke.toMetaRow());
+ LogEntry logEntry = new LogEntry(ke, 100, "f1");
+
m.at().family(logEntry.getColumnFamily()).qualifier(logEntry.getColumnQualifier())
+ .timestamp(logEntry.timestamp).put(logEntry.getValue());
+ update(accumuloClient, m);
+
+ // Verify state is still WAITING_FOR_OFFLINE
+ stats = scan(state, metaDataStateStore);
+ newState = stats.nextMergeState(accumuloClient, state);
+ assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+ // Delete the walog which will now allow a transition to MERGING
+ m = new Mutation(ke.toMetaRow());
+ m.putDelete(logEntry.getColumnFamily(), logEntry.getColumnQualifier(),
logEntry.timestamp);
+ update(accumuloClient, m);
+
// now we can split
stats = scan(state, metaDataStateStore);
assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient,
state));