This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new a2ce072e5c Make sure tablets with WALs are not merged (#3900)
a2ce072e5c is described below

commit a2ce072e5c75dc8347b6aa07375efdac5480b015
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Fri Oct 27 18:42:26 2023 -0400

    Make sure tablets with WALs are not merged (#3900)
    
    Add extra verification to make sure that no tablets that are merged
    contain any WALs to prevent losing data
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../java/org/apache/accumulo/manager/Manager.java  |  6 ++
 .../accumulo/manager/TabletGroupWatcher.java       | 28 ++++++++-
 .../apache/accumulo/manager/state/MergeStats.java  | 22 ++++++-
 .../accumulo/manager/state/MergeStatsTest.java     | 71 ++++++++++++++++++++++
 .../apache/accumulo/test/manager/MergeStateIT.java | 19 ++++++
 5 files changed, 142 insertions(+), 4 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index b86cba576c..196b50d76c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -658,6 +658,12 @@ public class Manager extends AbstractServer
 
               return TabletGoalState.HOSTED;
             case WAITING_FOR_OFFLINE:
+              // If we have walogs we need to be HOSTED to recover
+              if (!tls.walogs.isEmpty()) {
+                return TabletGoalState.HOSTED;
+              } else {
+                return TabletGoalState.UNASSIGNED;
+              }
             case MERGING:
               return TabletGoalState.UNASSIGNED;
           }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index b9914bef1f..4ffeee967d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -72,6 +72,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Cu
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -698,6 +699,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       ServerColumnFamily.TIME_COLUMN.fetch(scanner);
       scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
       scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(LogColumnFamily.NAME);
       Set<ReferenceFile> datafilesAndDirs = new TreeSet<>();
       for (Entry<Key,Value> entry : scanner) {
         Key key = entry.getKey();
@@ -710,9 +713,13 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
           }
         } else if (ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
           metadataTime = MetadataTime.parse(entry.getValue().toString());
-        } else if (key.compareColumnFamily(CurrentLocationColumnFamily.NAME) 
== 0) {
+        } else if (isTabletAssigned(key)) {
           throw new IllegalStateException(
               "Tablet " + key.getRow() + " is assigned during a merge!");
+          // Verify that Tablet has no WALs
+        } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+          throw new IllegalStateException(
+              "Tablet " + key.getRow() + " has walogs during a delete!");
         } else if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
           var allVolumesDirectory =
               new AllVolumesDirectory(extent.tableId(), 
entry.getValue().toString());
@@ -783,13 +790,25 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
       ServerColumnFamily.TIME_COLUMN.fetch(scanner);
       ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
+
       scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(LogColumnFamily.NAME);
       Mutation m = new Mutation(stopRow);
       MetadataTime maxLogicalTime = null;
       for (Entry<Key,Value> entry : scanner) {
         Key key = entry.getKey();
         Value value = entry.getValue();
-        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
+
+        // Verify that Tablet is offline
+        if (isTabletAssigned(key)) {
+          throw new IllegalStateException(
+              "Tablet " + key.getRow() + " is assigned during a merge!");
+          // Verify that Tablet has no WALs
+        } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+          throw new IllegalStateException("Tablet " + key.getRow() + " has 
walogs during a merge!");
+        } else if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
           m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
           fileCount++;
         } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)
@@ -894,6 +913,11 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     bw.flush();
   }
 
+  private boolean isTabletAssigned(Key key) {
+    return key.getColumnFamily().equals(CurrentLocationColumnFamily.NAME)
+        || key.getColumnFamily().equals(FutureLocationColumnFamily.NAME);
+  }
+
   private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
     try {
       AccumuloClient client = manager.getContext();
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
index df18c869da..5cecbfe98b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
@@ -52,6 +52,9 @@ import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
 
@@ -201,6 +204,9 @@ public class MergeStats {
 
   private boolean verifyMergeConsistency(AccumuloClient accumuloClient, 
CurrentState manager)
       throws TableNotFoundException, IOException {
+    // The only expected state when this method is called is 
WAITING_FOR_OFFLINE
+    verifyState(info, MergeState.WAITING_FOR_OFFLINE);
+
     MergeStats verify = new MergeStats(info);
     KeyExtent extent = info.getExtent();
     Scanner scanner = accumuloClient
@@ -230,8 +236,9 @@ public class MergeStats {
         break;
       }
 
-      if (!tls.walogs.isEmpty() && 
verify.getMergeInfo().needsToBeChopped(tls.extent)) {
-        log.debug("failing consistency: needs to be chopped {}", tls.extent);
+      // Verify that no WALs exist
+      if (!verifyWalogs(tls)) {
+        log.debug("failing consistency: {} has walogs {}", tls.extent, 
tls.walogs.size());
         return false;
       }
 
@@ -271,6 +278,17 @@ public class MergeStats {
         && unassigned == verify.total;
   }
 
+  @VisibleForTesting
+  void verifyState(MergeInfo info, MergeState expectedState) {
+    Preconditions.checkState(info.getState() == expectedState, "Unexpected 
merge state %s",
+        info.getState());
+  }
+
+  @VisibleForTesting
+  boolean verifyWalogs(TabletLocationState tls) {
+    return tls.walogs.isEmpty();
+  }
+
   public static void main(String[] args) throws Exception {
     ServerUtilOpts opts = new ServerUtilOpts();
     opts.parseArgs(MergeStats.class.getName(), args);
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
new file mode 100644
index 0000000000..c9ad9763c0
--- /dev/null
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.state;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.TabletLocationState;
+import 
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
+import org.apache.accumulo.server.manager.state.MergeInfo;
+import org.apache.accumulo.server.manager.state.MergeInfo.Operation;
+import org.apache.accumulo.server.manager.state.MergeState;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+public class MergeStatsTest {
+
+  @Test
+  public void testVerifyState() {
+    KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), 
new Text("begin"));
+    MergeInfo mergeInfo = new MergeInfo(keyExtent, Operation.MERGE);
+    MergeStats stats = new MergeStats(mergeInfo);
+    mergeInfo.setState(MergeState.WAITING_FOR_OFFLINE);
+
+    // Verify WAITING_FOR_OFFLINE does not throw an exception
+    stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE);
+
+    // State is wrong so should throw exception
+    mergeInfo.setState(MergeState.WAITING_FOR_CHOPPED);
+    assertThrows(IllegalStateException.class,
+        () -> stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE));
+  }
+
+  @Test
+  public void testVerifyWalogs() throws BadLocationStateException {
+    KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), 
new Text("begin"));
+    MergeStats stats = new MergeStats(new MergeInfo(keyExtent, 
Operation.MERGE));
+
+    // Verify that if there are Walogs the return true, else false
+    assertTrue(stats.verifyWalogs(getState(keyExtent, List.of())));
+    assertFalse(stats.verifyWalogs(getState(keyExtent, 
List.of(List.of("log1")))));
+  }
+
+  private TabletLocationState getState(KeyExtent keyExtent, 
Collection<Collection<String>> walogs)
+      throws BadLocationStateException {
+    return new TabletLocationState(keyExtent, null, null, null, null, walogs, 
true);
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java 
b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
index e36c8b8dd8..21173c3c5e 100644
--- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
@@ -46,6 +46,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ta
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.manager.state.MergeStats;
 import org.apache.accumulo.server.ServerContext;
@@ -207,6 +208,24 @@ public class MergeStateIT extends ConfigurableMacBase {
       metaDataStateStore.unassign(Collections.singletonList(new 
TabletLocationState(tablet, null,
           Location.current(state.someTServer), null, null, walogs, false)), 
null);
 
+      // Add a walog which should keep the state from transitioning to MERGING
+      KeyExtent ke = new KeyExtent(tableId, new Text("t"), new Text("p"));
+      m = new Mutation(ke.toMetaRow());
+      LogEntry logEntry = new LogEntry(ke, 100, "f1");
+      
m.at().family(logEntry.getColumnFamily()).qualifier(logEntry.getColumnQualifier())
+          .timestamp(logEntry.timestamp).put(logEntry.getValue());
+      update(accumuloClient, m);
+
+      // Verify state is still WAITING_FOR_OFFLINE
+      stats = scan(state, metaDataStateStore);
+      newState = stats.nextMergeState(accumuloClient, state);
+      assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+      // Delete the walog which will now allow a transition to MERGING
+      m = new Mutation(ke.toMetaRow());
+      m.putDelete(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), 
logEntry.timestamp);
+      update(accumuloClient, m);
+
       // now we can split
       stats = scan(state, metaDataStateStore);
       assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient, 
state));

Reply via email to