This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 36c9740299 Checks for corruption earlier and always report errors 
(#5227)
36c9740299 is described below

commit 36c9740299361874769cb18683c5130153dc488c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Jan 8 11:07:16 2025 -0500

    Checks for corruption earlier and always report errors (#5227)
    
    Was looking into an issue were a mutation was corrupted on the network.
    This caused the mutation to be written to the write ahead log and then a
    failure occurred in the tablet server which left the tablet in an
    inconsistent state.  Modifed the tablet server code to deserialize
    mutations as early as possible (it used to deserialize after writing to
    the walog, now it does it before).
    
    Wrote an IT to recreate this problem and found another bug.  Writing
    data to Accumulo does the following.
    
     1. Make a startUpdate RPC to create an update session
     2. Make one or more applyUpdates RPCs to add data to the session.  These
        RPCs are thrift oneway calls, so nothing is reported back.
     3. Call closeUpdate on the session to see what happened with all of the
        applyUpdates RPCs done in step 2.
    
    If an unexpected exception happened in step 2 above then it would not be
    reported back to the client.  These changes fix and test that as part of
    testing the corrupt mutation. After these changes if there was an error
    in step 2, then step 3 now throws an exception.
---
 .../accumulo/tserver/TabletClientHandler.java      |  32 ++++-
 .../accumulo/tserver/session/UpdateSession.java    |   1 +
 .../apache/accumulo/test/CorruptMutationIT.java    | 149 +++++++++++++++++++++
 3 files changed, 180 insertions(+), 2 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 087c733e6f..c6b3873902 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -267,8 +267,8 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
     if (us.currentTablet != null && 
us.currentTablet.getExtent().equals(keyExtent)) {
       return;
     }
-    if (us.currentTablet == null
-        && (us.failures.containsKey(keyExtent) || 
us.authFailures.containsKey(keyExtent))) {
+    if (us.currentTablet == null && (us.failures.containsKey(keyExtent)
+        || us.authFailures.containsKey(keyExtent) || us.unhandledException != 
null)) {
       // if there were previous failures, then do not accept additional writes
       return;
     }
@@ -339,6 +339,11 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
         List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
         for (TMutation tmutation : tmutations) {
           Mutation mutation = new ServerMutation(tmutation);
+          // Deserialize the mutation in an attempt to check for data 
corruption that happened on
+          // the network. This will avoid writing a corrupt mutation to the 
write ahead log and
+          // failing after its written to the write ahead log when it is 
deserialized to update the
+          // in memory map.
+          mutation.getUpdates();
           mutations.add(mutation);
           additionalMutationSize += mutation.numBytes();
         }
@@ -358,6 +363,15 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
           }
         }
       }
+    } catch (RuntimeException e) {
+      // This method is a thrift oneway method so an exception from it will 
not make it back to the
+      // client. Need to record the exception and set the session such that 
any future updates to
+      // the session are ignored.
+      us.unhandledException = e;
+      us.currentTablet = null;
+
+      // Rethrowing it will cause logging from thrift, so not adding logging 
here.
+      throw e;
     } finally {
       if (reserved) {
         server.sessionManager.unreserveSession(us);
@@ -541,6 +555,20 @@ public class TabletClientHandler implements 
TabletClientService.Iface {
     }
 
     try {
+      if (us.unhandledException != null) {
+        // Since flush() is not being called, any memory added to the global 
queued mutations
+        // counter will not be decremented. So do that here before throwing an 
exception.
+        server.updateTotalQueuedMutationSize(-us.queuedMutationSize);
+        us.queuedMutationSize = 0;
+        // make this memory available for GC
+        us.queuedMutations.clear();
+
+        // Something unexpected happened during this write session, so throw 
an exception here to
+        // cause a TApplicationException on the client side.
+        throw new IllegalStateException(
+            "Write session " + updateID + " saw an unexpected exception", 
us.unhandledException);
+      }
+
       // clients may or may not see data from an update session while
       // it is in progress, however when the update session is closed
       // want to ensure that reads wait for the write to finish
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
index ca487294d7..dc32765c28 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/UpdateSession.java
@@ -50,6 +50,7 @@ public class UpdateSession extends Session {
   public long flushTime = 0;
   public long queuedMutationSize = 0;
   public final Durability durability;
+  public Exception unhandledException = null;
 
   public UpdateSession(TservConstraintEnv env, TCredentials credentials, 
Durability durability) {
     super(credentials);
diff --git a/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java 
b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java
new file mode 100644
index 0000000000..552cd46b33
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/CorruptMutationIT.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TServiceClient;
+import org.junit.jupiter.api.Test;
+
+public class CorruptMutationIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "10");
+  }
+
+  @Test
+  public void testCorruptMutation() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      c.tableOperations().create(table);
+      try (BatchWriter writer = c.createBatchWriter(table)) {
+        Mutation m = new Mutation("1");
+        m.put("f1", "q1", new Value("v1"));
+        writer.addMutation(m);
+      }
+
+      var ctx = (ClientContext) c;
+      var tableId = ctx.getTableId(table);
+      var extent = new KeyExtent(tableId, null, null);
+      var tabletMetadata = ctx.getAmple().readTablet(extent, 
TabletMetadata.ColumnType.LOCATION);
+      var location = tabletMetadata.getLocation();
+      assertNotNull(location);
+      assertEquals(TabletMetadata.LocationType.CURRENT, location.getType());
+
+      TabletClientService.Iface client =
+          ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, 
location.getHostAndPort(), ctx);
+      // Make the same RPC calls made by the BatchWriter, but pass a corrupt 
serialized mutation in
+      // this try block.
+      try {
+        TInfo tinfo = TraceUtil.traceInfo();
+
+        long sessionId = client.startUpdate(tinfo, ctx.rpcCreds(), 
TDurability.DEFAULT);
+
+        // Write two valid mutations to the session. The tserver buffers data 
it receives via
+        // applyUpdates and may not write them until closeUpdate RPC is 
called. Because
+        // TSERV_TOTAL_MUTATION_QUEUE_MAX was set so small, these values 
should be written.
+        client.applyUpdates(tinfo, sessionId, extent.toThrift(),
+            List.of(createTMutation("abc", "z1"), createTMutation("def", 
"z2")));
+
+        // Simulate data corruption in the serialized mutation
+        TMutation badMutation = createTMutation("ghi", "z3");
+        badMutation.entries = -42;
+
+        // Write some good and bad mutations to the session. The server side 
will see an error here,
+        // however since this is a thrift oneway method no exception is 
expected here. This should
+        // leave the session in a broken state where it no longer accepts any 
new data.
+        client.applyUpdates(tinfo, sessionId, extent.toThrift(),
+            List.of(createTMutation("jkl", "z4"), badMutation, 
createTMutation("mno", "z5")));
+
+        // Write two more valid mutations to the session, these should be 
dropped on the server side
+        // because of the previous error. So should never see these updates.
+        client.applyUpdates(tinfo, sessionId, extent.toThrift(),
+            List.of(createTMutation("pqr", "z6"), createTMutation("stu", 
"z7")));
+
+        // Since client.applyUpdates experienced an error, should see an error 
when closing the
+        // session.
+        assertThrows(TApplicationException.class, () -> 
client.closeUpdate(tinfo, sessionId));
+      } finally {
+        ThriftUtil.returnClient((TServiceClient) client, ctx);
+      }
+
+      // The values that a scan must see
+      Set<String> expectedValues = Set.of("v1", "v2", "z1", "z2");
+
+      // The failed mutation should not have left the tablet in a bad state. 
Do some follow-on
+      // actions to ensure the tablet is still functional.
+      try (BatchWriter writer = c.createBatchWriter(table)) {
+        Mutation m = new Mutation("2");
+        m.put("f1", "q1", new Value("v2"));
+        writer.addMutation(m);
+      }
+
+      try (Scanner scanner = c.createScanner(table)) {
+        var valuesSeen =
+            scanner.stream().map(e -> 
e.getValue().toString()).collect(Collectors.toSet());
+        assertEquals(expectedValues, valuesSeen);
+      }
+
+      c.tableOperations().flush(table, null, null, true);
+
+      try (Scanner scanner = c.createScanner(table)) {
+        var valuesSeen =
+            scanner.stream().map(e -> 
e.getValue().toString()).collect(Collectors.toSet());
+        assertEquals(expectedValues, valuesSeen);
+      }
+    }
+  }
+
+  private static TMutation createTMutation(String row, String value) {
+    Mutation m = new Mutation(row);
+    m.put("x", "y", value);
+    return m.toThrift();
+  }
+}

Reply via email to